【Flume】实现MySQL数据增量自动提交到ClickHouse
Flume安装
wget http://www.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压
tar zxvf apache-flume-1.5.2-bin.tar.gz
打包java依赖包
-
需要用到三个包:flume-ng-sql-source、flume-clickhouse-sink和mysql-connector-java。
-
java打包需要工具maven,安装方式见https://blog.csdn.net/weixx3/article/details/80331538
flume-ng-sql-source包
-
从git上获取源码 https://github.com/keedio/flume-ng-sql-source
注意tag要是1.5.2 -
修改源码:
将flume-ng-sql-source-1.5.2/src/main/java/org/keedio/flume/source/SQLSourceHelper.java 的",“改为”\t",以免ClickHouse报错
-
打包
在pom.xml文件目录层级下执行maven命令:
mvn package -Dmaven.test.skip=true
-
生成的target文件中的flume-ng-sql-source-1.5.2.jar 移动到flume的lib文件夹下,注意是lib文件夹,此文件夹下放有java运行时的jar包,不放入此文件夹下会报错cannot find symbol之类的错误
flume-clickhouse-sink包
-
因为并没有找到flume-clickhouse-sink包的相关资料,借用flume-ng-kafka-sink包的框架,放入Clickhouse-Sink.java代码以打包。
-
-
修改pom.xml,屏蔽所有的KafkaSink
修改后的文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Apache Flume ClickHouse Sink</name>
<description>Kafka 0.8+ sink for Apache Flume NG</description>
<groupId>org.apache.flume.sink.clickhouse</groupId>
<artifactId>flume-clickhouse-sink</artifactId>
<version>1.5.2</version>
<packaging>jar</packaging>
<parent>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sinks</artifactId>
<version>1.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
-
下载clickhouse-sink代码
源码在https://reviews.apache.org/r/50692/diff/1#2
下面的操作需要cd到
/flume/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka
此目录,git下载后需要修改源码字符集为UTF-8,以免插入clickhouse中文时全是❓,小小的眼睛里充满了大大的疑惑。代码在此处:
StreamUtils需要导入,完整的文件如下:
ClickHouseSink.java:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.clickhouse;
import com.google.common.base.Preconditions;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.ClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.util.guava.StreamUtils;
import ru.yandex.clickhouse.util.apache.StringUtils;
import ru.yandex.clickhouse.util.Utils;
import static org.apache.flume.sink.clickhouse.ClickHouseSinkConstants.*;
/**
* A sink that sends Flume Events to ClickHouse Database via HTTP.</p>
*
* If the response code is less than 200 or greater or equal to 300, an HTTP exception is
* thrown and transaction is rolled back.</p>
*
* This sink must be configured with mandatory parameters detailed in
* {@link ClickHouseSinkConstants}.</p>
*
* @see https://clickhouse.yandex/reference_en.html
*/
public class ClickHouseSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);
private PoolingClientConnectionManager connectionManager = null;
private DefaultHttpClient httpClient = null;
private CredentialsProvider credsProvider = null;
private AuthCache authCache = null;
private HttpContext httpContext = null;
private SinkCounter sinkCounter = null;
private String host = null;
private String port = null;
private String user = null;
private String password = null;
private String database = null;
private String table = null;
private String format = null;
private String query = null;
private int batchSize;
private int httpConnTimeout;
private int httpSockTimeout;
private String url = null;
@Override
public void configure(Context context) {
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
Preconditions.checkArgument(context.getString(HOST) != null &&
context.getString(HOST).length() > 0,
"ClickHouse host must be specified!");
this.host = context.getString(HOST);
if (!this.host.startsWith("http://")){
this.host = "http://" + this.host;
}
Preconditions.checkArgument(context.getString(DATABASE) != null &&
context.getString(DATABASE).length() > 0,
"ClickHouse database must be specified!");
this.database = context.getString(DATABASE);
Preconditions.checkArgument(context.getString(TABLE) != null &&
context.getString(TABLE).length() > 0,
"ClickHouse table must be specified!");
this.table = context.getString(TABLE);
this.port = context.getString(PORT, DEFAULT_PORT);
this.format = context.getString(FORMAT, DEFAULT_FORMAT);
this.user = context.getString(USER, DEFAULT_USER);
this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);
this.query = createQuery(database, table, format);
this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
this.httpConnTimeout = context.getInteger(HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_CONNECTION_TIMEOUT);
this.httpSockTimeout = context.getInteger(HTTP_SOCK_TIMEOUT,
DEFAULT_HTTP_SOCKET_TIMEOUT);
this.url = host + ":" + port;
this.credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope(host, Integer.parseInt(port)),
new UsernamePasswordCredentials(user, password));
BasicScheme basicAuth = new BasicScheme();
authCache = new BasicAuthCache();
authCache.put(new HttpHost(host, Integer.parseInt(port)), basicAuth);
}
@Override
public void start() {
connectionManager = new PoolingClientConnectionManager();
httpClient = new DefaultHttpClient(connectionManager);
httpClient.setCredentialsProvider(credsProvider);
HttpParams params = httpClient.getParams();
HttpConnectionParams.setConnectionTimeout(params, httpConnTimeout);
HttpConnectionParams.setSoKeepalive(params, false);
HttpConnectionParams.setSoTimeout(params, httpSockTimeout);
HttpConnectionParams.setStaleCheckingEnabled(params, false);
httpContext = new BasicHttpContext();
httpContext.setAttribute(ClientContext.AUTH_CACHE, authCache);
httpContext.setAttribute(ClientContext.CREDS_PROVIDER, credsProvider);
sinkCounter.start();
super.start();
}
@Override
public void stop() {
logger.info("ClickHouse sink {} stopping", getName());
connectionManager.shutdown();
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
super.stop();
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
HttpPost httpPost = null;
try {
int count;
StringBuilder batch = new StringBuilder();
batch.append(this.query).append("\n");
for (count = 0; count < batchSize; ++count) {
Event event = ch.take();
if (event == null) {
break;
}
batch.append(new String(event.getBody(), "UTF-8")).append("\n");
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
txn.commit();
return Status.BACKOFF;
} else if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
httpPost = new HttpPost(url);
httpPost.addHeader("Content-Type", "application/x-www-form-urlencoded");
httpPost.addHeader("charset", "utf-8");
httpPost.addHeader("Connection", "close");
HttpEntity entity = new StringEntity(batch.toString(),StreamUtils.UTF_8);
httpPost.setEntity(entity);
sinkCounter.addToEventDrainAttemptCount(count);
HttpResponse response = httpClient.execute(httpPost, httpContext);
int respCode = response.getStatusLine().getStatusCode();
EntityUtils.consumeQuietly(response.getEntity());
// Correct HTTP Response Code
if (respCode < 200 || respCode >= 300) {
throw new HttpException(String.format("An error occurred while inserting " +
"into ClickHouse. HTTP response code: %d \n " +
"Database: %s \n" +
"Table: %s", respCode, database, table));
}
sinkCounter.incrementEventDrainSuccessCount();
status = Status.READY;
txn.commit();
} catch (Throwable t) {
if (httpPost != null)
httpPost.abort();
txn.rollback();
logger.error(t.getMessage(), t);
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
private String createQuery(String database, String table, String format){
return String.format("INSERT INTO %s.%s FORMAT %s", database, table, format);
}
}
-
回到pom.xml所在的目录,执行
mvn package
将flume-clickhouse-sink-1.5.2.jar文件复制到flume的lib目录
mysql-connector-java.jar
Flume配置文件
要放到conf文件夹下,mysql-clickhouse.conf
如下:
agent.channels = channelMProductPL
agent.sources = sourceMProductPL
agent.sinks = sinkMProductPL
###########sql source#################
agent.sources.sourceMProductPL.type = org.keedio.flume.source.SQLSource
agent.sources.sourceMProductPL.hibernate.connection.url = jdbc:mysql://www.dw4ever.cn/test_db?useSSL=false
# Hibernate Database connection properties
agent.sources.sourceMProductPL.hibernate.connection.user = root
agent.sources.sourceMProductPL.hibernate.connection.password = Biyjatqdw...
agent.sources.sourceMProductPL.hibernate.connection.autocommit = true
agent.sources.sourceMProductPL.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.sourceMProductPL.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent.sources.sourceMProductPL.hibernate.table = MYtest
agent.sources.sourceMProductPL.run.query.delay=10000
agent.sources.sourceMProductPL.enclose.by.quotes = false
agent.sources.sourceMProductPL.status.file.path = /flume/apache-flume-1.5.2-bin
agent.sources.sourceMProductPL.status.file.name = agent.sqlSource.status.mProductPL
agent.sources.sourceMProductPL.inputCharset = UTF-8
# Custom query
agent.sources.sourceMProductPL.start.from = 0
agent.sources.sourceMProductPL.custom.query = SELECT * FROM MYtest WHERE id > $@$
agent.sources.sourceMProductPL.batch.size = 1
#1000
agent.sources.sourceMProductPL.max.rows = 1
#1000
agent.sources.sourceMProductPL.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sourceMProductPL.hibernate.c3p0.min_size=1
agent.sources.sourceMProductPL.hibernate.c3p0.max_size=10
##############################channels
agent.channels.channelMProductPL.type = memory
agent.channels.channelMProductPL.capacity = 1000
agent.channels.channelMProductPL.transactionCapacity = 1000
agent.channels.channelMProductPL.byteCapacityBufferPercentage = 20
agent.channels.channelMProductPL.byteCapacity = 1600000
#############################sinks
agent.sinks.sinkMProductPL.type = org.apache.flume.sink.clickhouse.ClickHouseSink
agent.sinks.sinkMProductPL.host = http://user_name:password@host_name
agent.sinks.sinkMProductPL.port = 8123
agent.sinks.sinkMProductPL.database = test
agent.sinks.sinkMProductPL.table = MY_test
agent.sinks.sinkMProductPL.batchSize = 1
#3000
agent.sinks.sinkMProductPL.format = TabSeparated
agent.sinks.sinkMProductPL.channel = channelMProductPL
agent.sources.sourceMProductPL.channels=channelMProductPL
之后需要在你指定的source-status-file的路径新建同名的statusfile, 不然会出现一直插入不停的问题,此问题也需要custom.query中添加条件来协助解决。
注意连接clickhousesink的username以及password要放到url中,不然会报401错误。
万事俱备,只差运行
./flume-ng agent --conf ../conf -conf-file ../conf/mysql-clickhouse.conf -name agent -Dflume.root.logger=INFO,console
其中 --conf 指明conf目录路径,-conf-file 指明conf文件路径,-name指明flume的agent名称(即配置文件中的) 后面指明log的位置以及log等级。
结束
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,要想实现数据的实时同步的话还是需要kafka,flume只能识别增量,不能知道delete,update,适用于收集日志。
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!