vlambda博客
学习文章列表

【Flume】实现MySQL数据增量自动提交到ClickHouse

点击上方 蓝色字体 ,选择“设为星标”
回复" 面试" 获取更多惊喜
【Flume】实现MySQL数据增量自动提交到ClickHouse

Flume安装

wget http://www.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz

解压

tar zxvf apache-flume-1.5.2-bin.tar.gz

打包java依赖包

  • 需要用到三个包:flume-ng-sql-source、flume-clickhouse-sink和mysql-connector-java。

  • java打包需要工具maven,安装方式见https://blog.csdn.net/weixx3/article/details/80331538

flume-ng-sql-source包

  1. 从git上获取源码 https://github.com/keedio/flume-ng-sql-source
    注意tag要是1.5.2

  2. 修改源码:
    将flume-ng-sql-source-1.5.2/src/main/java/org/keedio/flume/source/SQLSourceHelper.java 的",“改为”\t",以免ClickHouse报错

【Flume】实现MySQL数据增量自动提交到ClickHouse
  1. 打包
    在pom.xml文件目录层级下执行maven命令:
mvn package -Dmaven.test.skip=true
  1. 生成的target文件中的flume-ng-sql-source-1.5.2.jar 移动到flume的lib文件夹下,注意是lib文件夹,此文件夹下放有java运行时的jar包,不放入此文件夹下会报错cannot find symbol之类的错误

flume-clickhouse-sink包

  1. 因为并没有找到flume-clickhouse-sink包的相关资料,借用flume-ng-kafka-sink包的框架,放入Clickhouse-Sink.java代码以打包。

  2. 修改pom.xml,屏蔽所有的KafkaSink
    修改后的文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <name>Apache Flume ClickHouse Sink</name>
  <description>Kafka 0.8+ sink for Apache Flume NG</description>

  <groupId>org.apache.flume.sink.clickhouse</groupId>
  <artifactId>flume-clickhouse-sink</artifactId>
  <version>1.5.2</version>
  <packaging>jar</packaging>
  
  <parent>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sinks</artifactId>
    <version>1.4.0</version>
  </parent>

  <dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    </dependency>
    <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka_2.10</artifactId>
     <version>0.8.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>19.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
    </plugins>
  </build> 
</project>
  1. 下载clickhouse-sink代码

源码在https://reviews.apache.org/r/50692/diff/1#2
下面的操作需要cd到

/flume/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka

此目录,git下载后需要修改源码字符集为UTF-8,以免插入clickhouse中文时全是❓,小小的眼睛里充满了大大的疑惑。代码在此处:

【Flume】实现MySQL数据增量自动提交到ClickHouse

StreamUtils需要导入,完整的文件如下:
ClickHouseSink.java:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.flume.sink.clickhouse;

import com.google.common.base.Preconditions;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;

import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.ClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.util.guava.StreamUtils;
import ru.yandex.clickhouse.util.apache.StringUtils;
import ru.yandex.clickhouse.util.Utils;
import static org.apache.flume.sink.clickhouse.ClickHouseSinkConstants.*;

/**
 * A sink that sends Flume Events to ClickHouse Database via HTTP.</p>
 *
 * If the response code is less than 200 or greater or equal to 300, an HTTP exception is
 * thrown and transaction is rolled back.</p>
 *
 * This sink must be configured with mandatory parameters detailed in
 * {@link ClickHouseSinkConstants}.</p>
 *
 * @see https://clickhouse.yandex/reference_en.html
 */
public class ClickHouseSink extends AbstractSink implements Configurable {

  private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);

  private PoolingClientConnectionManager connectionManager = null;
  private DefaultHttpClient httpClient = null;

  private CredentialsProvider credsProvider = null;
  private AuthCache authCache = null;
  private HttpContext httpContext = null;

  private SinkCounter sinkCounter = null;

  private String host = null;
  private String port = null;
  private String user = null;
  private String password = null;
  private String database = null;
  private String table = null;
  private String format = null;
  private String query = null;

  private int batchSize;

  private int httpConnTimeout;
  private int httpSockTimeout;

  private String url = null;

  @Override
  public void configure(Context context) {

    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }

    Preconditions.checkArgument(context.getString(HOST) != null &&
        context.getString(HOST).length() > 0,
        "ClickHouse host must be specified!");
    this.host = context.getString(HOST);
    if (!this.host.startsWith("http://")){
      this.host = "http://" + this.host;
    }

    Preconditions.checkArgument(context.getString(DATABASE) != null &&
        context.getString(DATABASE).length() > 0,
        "ClickHouse database must be specified!");
    this.database = context.getString(DATABASE);

    Preconditions.checkArgument(context.getString(TABLE) != null &&
        context.getString(TABLE).length() > 0,
        "ClickHouse table must be specified!");
    this.table = context.getString(TABLE);

    this.port = context.getString(PORT, DEFAULT_PORT);
    this.format = context.getString(FORMAT, DEFAULT_FORMAT);
    this.user = context.getString(USER, DEFAULT_USER);
    this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);

    this.query = createQuery(database, table, format);

    this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);

    this.httpConnTimeout = context.getInteger(HTTP_CONNECTION_TIMEOUT,
        DEFAULT_HTTP_CONNECTION_TIMEOUT);
    this.httpSockTimeout = context.getInteger(HTTP_SOCK_TIMEOUT,
        DEFAULT_HTTP_SOCKET_TIMEOUT);

    this.url = host + ":" + port;

    this.credsProvider = new BasicCredentialsProvider();
    credsProvider.setCredentials(new AuthScope(host, Integer.parseInt(port)),
        new UsernamePasswordCredentials(user, password));

    BasicScheme basicAuth = new BasicScheme();
    authCache = new BasicAuthCache();
    authCache.put(new HttpHost(host, Integer.parseInt(port)), basicAuth);
  }

  @Override
  public void start() {

    connectionManager = new PoolingClientConnectionManager();

    httpClient = new DefaultHttpClient(connectionManager);

    httpClient.setCredentialsProvider(credsProvider);

    HttpParams params = httpClient.getParams();
    HttpConnectionParams.setConnectionTimeout(params, httpConnTimeout);
    HttpConnectionParams.setSoKeepalive(params, false);
    HttpConnectionParams.setSoTimeout(params, httpSockTimeout);
    HttpConnectionParams.setStaleCheckingEnabled(params, false);

    httpContext = new BasicHttpContext();
    httpContext.setAttribute(ClientContext.AUTH_CACHE, authCache);
    httpContext.setAttribute(ClientContext.CREDS_PROVIDER, credsProvider);
    sinkCounter.start();
    super.start();
  }


  @Override
  public void stop() {
    logger.info("ClickHouse sink {} stopping", getName());

    connectionManager.shutdown();

    sinkCounter.incrementConnectionClosedCount();
    sinkCounter.stop();
    super.stop();
  }


  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();

    HttpPost httpPost = null;
    try {
      int count;
      StringBuilder batch = new StringBuilder();
      batch.append(this.query).append("\n");

      for (count = 0; count < batchSize; ++count) {
        Event event = ch.take();
        if (event == null) {
          break;
        }
        batch.append(new String(event.getBody(), "UTF-8")).append("\n");
      }

      if (count <= 0) {
        sinkCounter.incrementBatchEmptyCount();
        txn.commit();
        return Status.BACKOFF;

      } else if (count < batchSize) {
        sinkCounter.incrementBatchUnderflowCount();
      } else {
        sinkCounter.incrementBatchCompleteCount();
      }

      httpPost = new HttpPost(url);
      httpPost.addHeader("Content-Type""application/x-www-form-urlencoded");
      httpPost.addHeader("charset""utf-8");
      httpPost.addHeader("Connection""close");
      HttpEntity entity = new StringEntity(batch.toString(),StreamUtils.UTF_8);
      httpPost.setEntity(entity);

      sinkCounter.addToEventDrainAttemptCount(count);

      HttpResponse response = httpClient.execute(httpPost, httpContext);

      int respCode = response.getStatusLine().getStatusCode();
      EntityUtils.consumeQuietly(response.getEntity());

      // Correct HTTP Response Code
      if (respCode < 200 || respCode >= 300) {
        throw new HttpException(String.format("An error occurred while inserting " +
            "into ClickHouse. HTTP response code: %d \n " +
            "Database: %s \n" +
            "Table: %s", respCode, database, table));
      }

      sinkCounter.incrementEventDrainSuccessCount();
      status = Status.READY;
      txn.commit();

    } catch (Throwable t) {

      if (httpPost != null)
        httpPost.abort();

      txn.rollback();

      logger.error(t.getMessage(), t);

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error) t;
      }
    } finally {
      txn.close();
    }
    return status;
  }

  private String createQuery(String database, String table, String format){
    return String.format("INSERT INTO %s.%s FORMAT %s", database, table, format);
  }
}
  1. 回到pom.xml所在的目录,执行
mvn package

将flume-clickhouse-sink-1.5.2.jar文件复制到flume的lib目录

mysql-connector-java.jar

Flume配置文件

要放到conf文件夹下,mysql-clickhouse.conf

如下:

agent.channels = channelMProductPL
agent.sources = sourceMProductPL
agent.sinks = sinkMProductPL

###########sql source#################
agent.sources.sourceMProductPL.type = org.keedio.flume.source.SQLSource
agent.sources.sourceMProductPL.hibernate.connection.url = jdbc:mysql://www.dw4ever.cn/test_db?useSSL=false

# Hibernate Database connection properties
agent.sources.sourceMProductPL.hibernate.connection.user = root
agent.sources.sourceMProductPL.hibernate.connection.password = Biyjatqdw...
agent.sources.sourceMProductPL.hibernate.connection.autocommit = true
agent.sources.sourceMProductPL.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.sourceMProductPL.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent.sources.sourceMProductPL.hibernate.table = MYtest
agent.sources.sourceMProductPL.run.query.delay=10000
agent.sources.sourceMProductPL.enclose.by.quotes = false

agent.sources.sourceMProductPL.status.file.path = /flume/apache-flume-1.5.2-bin
agent.sources.sourceMProductPL.status.file.name = agent.sqlSource.status.mProductPL

agent.sources.sourceMProductPL.inputCharset = UTF-8

# Custom query
agent.sources.sourceMProductPL.start.from = 0
agent.sources.sourceMProductPL.custom.query = SELECT * FROM MYtest WHERE id > $@$
agent.sources.sourceMProductPL.batch.size = 1
#1000
agent.sources.sourceMProductPL.max.rows = 1
#1000
agent.sources.sourceMProductPL.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sourceMProductPL.hibernate.c3p0.min_size=1
agent.sources.sourceMProductPL.hibernate.c3p0.max_size=10

##############################channels

agent.channels.channelMProductPL.type = memory
agent.channels.channelMProductPL.capacity = 1000
agent.channels.channelMProductPL.transactionCapacity = 1000
agent.channels.channelMProductPL.byteCapacityBufferPercentage = 20
agent.channels.channelMProductPL.byteCapacity = 1600000

#############################sinks
agent.sinks.sinkMProductPL.type = org.apache.flume.sink.clickhouse.ClickHouseSink
agent.sinks.sinkMProductPL.host = http://user_name:password@host_name
agent.sinks.sinkMProductPL.port = 8123
agent.sinks.sinkMProductPL.database = test
agent.sinks.sinkMProductPL.table = MY_test
agent.sinks.sinkMProductPL.batchSize = 1
#3000
agent.sinks.sinkMProductPL.format = TabSeparated

agent.sinks.sinkMProductPL.channel = channelMProductPL
agent.sources.sourceMProductPL.channels=channelMProductPL

之后需要在你指定的source-status-file的路径新建同名的statusfile, 不然会出现一直插入不停的问题,此问题也需要custom.query中添加条件来协助解决。

注意连接clickhousesink的username以及password要放到url中,不然会报401错误。

万事俱备,只差运行

./flume-ng agent --conf ../conf -conf-file ../conf/mysql-clickhouse.conf -name agent -Dflume.root.logger=INFO,console

其中 --conf 指明conf目录路径,-conf-file 指明conf文件路径,-name指明flume的agent名称(即配置文件中的) 后面指明log的位置以及log等级。

结束

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,要想实现数据的实时同步的话还是需要kafka,flume只能识别增量,不能知道delete,update,适用于收集日志。


如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!