源码分析 Datax 调度以及数据传输流程
2021 的第一篇文章
内容简述
这篇文章主要讲 Datax 的两个事情:
-
调度流程 -
数据传输流程
调度指的是 Datax 根据数据(任务执行情况)来进行任务执行的顺序以及优先级;数据传输是指 reader 和 writer 是如何配合进行数据之间的交互,以及 Datax 的一些特性例如速率把控、并行操作等是如何实现的。
源码入口
调度流程
调度流程的代码入口在 JobContainer.java 的 schedule() 方法。
首先会获取全局的 channel 数、每个 TaskGroup 的 channel 数以及计算需要的 channelNumber。
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
接着调用 JobAssignUtil 对每个 TaskkGroup 进行合理的 channel 分配。
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup); //公平分配
然后实例化 AbstractScheduler 进行调度。
private void schedule() {
//...
scheduler.schedule(taskGroupConfigs);
//...
}
调度方法中主要做的两个事情:
-
注册监控信息(后面会单独文章讲) -
启动所有的任务
启动任务的代码是
int totalTasks = calculateTaskCount(configurations); //计算任务总数
startAllTaskGroup(configurations); //开启所有任务
调用的方式 ProcessInnerScheduler#startAllTaskGroup
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration); //实例化 TaskGroupContainerRunner -> TaskGroupContaine
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
Datax 使用了 TaskGroupContainerRunner 将 Configuration 封装成一个 TaskGroupContainer,同时 TaskGroupContainerRunner 的 state 属性代表其 TaskGroupContainer 的状态。然后扔给线程池去执行。接下来就是任务启动的代码。
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
至此,调度的流程结束,接下来是数据传输流程。
数据传输流程
初始化
数据传输流程都在 TaskGroupContainer 里面,所以一些控制的指标参数、重试策略等配置会在这里体现和处理的更加细致。首先说下,Datax 采用的生产者-消费者模型,这样做可以解耦、提高并行的效率以及控制处理数据的速率。在 TaskGroupContainer 中,实际上 Reader 和 Writer 是通过 channel 来链接。channel 可以是内存的,也可能是持久化的,插件不必关心。插件通过 RecordSender 往 channel 写入数据,通过 RecordReceiver 从 channel 读取数据。
代码篇幅过长,我慢慢截图来讲。TaskGroupContainer#start 代码入口,代表 TaskGroup 启动。这个方法包括的内容主要有:
-
获取参数,配置监控器 -
给各个参数封装成可执行的任务,并执行任务以及注册监控 -
对任务的状态进行监控,进行重试机制的执行
我们在这里重点关注任务的执行。TaskExecutor 是 TaskGoupContainer 的内部类。
class TaskExecutor {
private Configuration taskConfig;
private int taskId;
private int attemptCount;
private Channel channel;
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
private Communication taskCommunication;
// method area
}
TaskGoupContainer 会将切分的配置封装成一个 TaskExecutor
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
然后启动的代码是以下
public void doStart() {
this.writerThread.start(); //①
//②
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
//throw exception
}
this.readerThread.start(); //③
//④
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
//throw exception
}
}
代码①和代码③是各自开启两个线程,分别为读线程和写线程.两个线程皆在 TaskExecutor 实例化的时候进行生成.首先我们看下写线程需要执行的额 Runnable. 主要是 generateRunner 方法.
//①
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
//②
newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
//③
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.WRITER);
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
this.channel, pluginCollector));
//④
newRunner.setTaskPluginCollector(pluginCollector);
代码①会加载 WriterRunner 任务;
代码②会将写任务的配置作为参数传入;
代码③和④实际上是加载 taskPlugin、用于处理脏数据和 job/task 通信数据
然后我们看下读任务,它也是在 generateRunner 方法
//①
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
//②
newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER));
//③
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.READER);
//④
RecordSender recordSender;
if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}
((ReaderRunner) newRunner).setRecordSender(recordSender);
//⑤
newRunner.setTaskPluginCollector(pluginCollector);
代码①会加载 ReaderRunner 任务。
代码②会将读配置作为参数写入。
代码③和⑤实际上是加载 taskPlugin、用于处理脏数据和 job/task 通信数据。
代码④值得注意,这里会判断使用哪种 Exchange. Exchange 可以理解为外部与 channel 交互的中间件.BufferedRecordTransformerExchanger 指的是可以根据特定的格式转换。BufferedRecordExchanger 仅仅是根据 Datax 的格式进行传输。
任务读写开始(MySQl 为例)
当读写任务完成初始化,接下来就是其 start 方法里面的内容了。先介绍读任务。读任务定义于 ReaderRunner,为了更加灵活,在 ReaderRunner 内也是使用插件机制的.如果我们需要拓展,只拓展 Reader.Task 即可.
Reader.Task taskReader = (Reader.Task) this.getPlugin();
然后重点关注 Reader.Task 的 startRead 方法 调用 startRead 方法需要传入 Exchange
taskReader.startRead(recordSender);
recordSender.terminate();
调用 startRead 方法需要传入 Exchange,接下来就是具体的数据源实现了。具体实现类 CommonRdbmsReader
首先是获取 QUERY_SQL、username、password 等参数,进行连接
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);
然后执行 QUERY_SQL 获取数据
rs = DBUtil.query(conn, querySql, fetchSize);
然后循环查询结果,开始写入
while (rs.next()) {
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
}
transportOneRecord 实际上是通过 Exchanger 进行交互。
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,
ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
TaskPluginCollector taskPluginCollector) {
Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector);
recordSender.sendToWriter(record);
return record;
}
接下来是写任务。写任务的实现在 commonRdbmsWriterTask#startWrite。写任务首先要获取连接,然后处理连接的 session 问题,最后
Connection connection = DBUtil.getConnection(this.dataBaseType,
this.jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
this.dataBaseType, BASIC_MESSAGE);
然后开始写数据。写数据是从 Exchanger 里面获取的。从 Exchanger 获取数据循环,然后检查列数是否满足
//使用 buffer 缓存, batchSize 是控制每次发送的数量
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// throw error...
}
}
如果符合条件的,即可添加进缓存。最后刷进数据库里
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
Exchanger 交互
Exchanger 实现了 RecordSender 和 RecordReceiver,同时它有两个实现类 BufferedRecordExchanger 以及 BufferedRecordTransformerExchanger。
RecordSender.java
public interface RecordSender {
public Record createRecord();
public void sendToWriter(Record record);
public void flush();
public void terminate();
public void shutdown();
}
RecordReceiver.java
public interface RecordReceiver {
public Record getFromReader();
public void shutdown();
}
以 BufferedRecordExchanger 为例,我们挑几个方法讲下。
方法名 | 作用 |
---|---|
sendToWriter | 将数据写入 channel |
flush | 刷新缓冲区, 也就是将缓冲区的数据写入 channel |
terminate | 马上将缓冲区数据刷入 channel |
getFromReader | 从 channel 获取数据 |
sendToWriter 方法是负责将记录放进 channel。首先会校验记录的大小是否超出限制
if (record.getMemorySize() > this.byteCapacity) {
this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
return;
}
然后判断是否 channel 是否已经满了。如果满了就刷新进行发送
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
flush();
}
否则加入队列
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
flush 方法也很简单,直接添加到 channel 即可。
this.channel.pushAll(this.buffer);
this.buffer.clear();
terminate 方法是直接将缓冲区的数据刷入 channel。
flush();
this.channel.pushTerminate(TerminateRecord.get());
getFromReader 方法是给 writer 获取数据,进行下一步操作。首先是排查队列是否为空。如果为空,然后调用方法将数据批量写入 writer 的缓冲区
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {
receive();
}
如果不为空,根据索引读取,返回缓冲区中的一条。
Record record = this.buffer.get(this.bufferIndex++);
if (record instanceof TerminateRecord) {
record = null;
}
return record;
文末
大致上 Datax 的读写任务都是这个流程,灵活在于它自身提供了很多数据源的实现案例。同时也提供插件式的 reader 和 writer,这样可以做更多的扩展和业务针对性的适配
end