vlambda博客
学习文章列表

源码分析 Datax 调度以及数据传输流程

2021 的第一篇文章

内容简述

这篇文章主要讲 Datax 的两个事情:

  1. 调度流程
  2. 数据传输流程

调度指的是 Datax 根据数据(任务执行情况)来进行任务执行的顺序以及优先级;数据传输是指 reader 和 writer 是如何配合进行数据之间的交互,以及 Datax 的一些特性例如速率把控、并行操作等是如何实现的。

源码入口

调度流程

调度流程的代码入口在 JobContainer.java 的 schedule() 方法。

首先会获取全局的 channel 数、每个 TaskGroup 的 channel 数以及计算需要的 channelNumber。

    int channelsPerTaskGroup = this.configuration.getInt(
            CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
    int taskNumber = this.configuration.getList(
            CoreConstant.DATAX_JOB_CONTENT).size();

    this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);

接着调用 JobAssignUtil 对每个 TaskkGroup 进行合理的 channel 分配。

    List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
            this.needChannelNumber, channelsPerTaskGroup);      //公平分配

然后实例化 AbstractScheduler 进行调度。

    private void schedule() 
        //...
        scheduler.schedule(taskGroupConfigs);
        //...
    }

调度方法中主要做的两个事情:

  1. 注册监控信息(后面会单独文章讲)
  2. 启动所有的任务

启动任务的代码是

    int totalTasks = calculateTaskCount(configurations);  //计算任务总数
    startAllTaskGroup(configurations);      //开启所有任务

调用的方式 ProcessInnerScheduler#startAllTaskGroup

    public void startAllTaskGroup(List<Configuration> configurations) {
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);        //实例化 TaskGroupContainerRunner -> TaskGroupContaine
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }

Datax 使用了 TaskGroupContainerRunner 将 Configuration 封装成一个 TaskGroupContainer,同时 TaskGroupContainerRunner 的 state 属性代表其 TaskGroupContainer 的状态。然后扔给线程池去执行。接下来就是任务启动的代码。

    try {
        Thread.currentThread().setName(
                String.format("taskGroup-%d"this.taskGroupContainer.getTaskGroupId()));
        this.taskGroupContainer.start();
                this.state = State.SUCCEEDED;
    } catch (Throwable e) {
        this.state = State.FAILED;
        throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR, e);
    }

至此,调度的流程结束,接下来是数据传输流程。

数据传输流程

初始化

数据传输流程都在 TaskGroupContainer 里面,所以一些控制的指标参数、重试策略等配置会在这里体现和处理的更加细致。首先说下,Datax 采用的生产者-消费者模型,这样做可以解耦、提高并行的效率以及控制处理数据的速率。在 TaskGroupContainer 中,实际上 Reader 和 Writer 是通过 channel 来链接。channel 可以是内存的,也可能是持久化的,插件不必关心。插件通过 RecordSender 往 channel 写入数据,通过 RecordReceiver 从 channel 读取数据。

代码篇幅过长,我慢慢截图来讲。TaskGroupContainer#start 代码入口,代表 TaskGroup 启动。这个方法包括的内容主要有:

  1. 获取参数,配置监控器
  2. 给各个参数封装成可执行的任务,并执行任务以及注册监控
  3. 对任务的状态进行监控,进行重试机制的执行

我们在这里重点关注任务的执行。TaskExecutor 是 TaskGoupContainer 的内部类。

  class TaskExecutor {
        private Configuration taskConfig;

        private int taskId;

        private int attemptCount;

        private Channel channel;

        private Thread readerThread;

        private Thread writerThread;
        
        private ReaderRunner readerRunner;
        
        private WriterRunner writerRunner;

        private Communication taskCommunication;
        
        // method area
   }

TaskGoupContainer 会将切分的配置封装成一个 TaskExecutor

    TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
        taskStartTimeMap.put(taskId, System.currentTimeMillis());
            taskExecutor.doStart();

然后启动的代码是以下

    public void doStart() {
        this.writerThread.start(); //①
        //②
        if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            //throw exception
        }
        this.readerThread.start();  //③

        //④
        if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
             //throw exception
        }
    }

代码①和代码③是各自开启两个线程,分别为读线程和写线程.两个线程皆在 TaskExecutor 实例化的时候进行生成.首先我们看下写线程需要执行的额 Runnable. 主要是 generateRunner 方法.

    //①
    newRunner = LoadUtil.loadPluginRunner(pluginType,
            this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)); 
    //②
    newRunner.setJobConf(this.taskConfig
            .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
    //③
    pluginCollector = ClassUtil.instantiate(
            taskCollectorClass, AbstractTaskPluginCollector.class,
            configurationthis.taskCommunication,
            PluginType.WRITER)
;
    ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
            this.channel, pluginCollector));
    //④
    newRunner.setTaskPluginCollector(pluginCollector);

代码①会加载 WriterRunner 任务;
代码②会将写任务的配置作为参数传入;
代码③和④实际上是加载 taskPlugin、用于处理脏数据和 job/task 通信数据

然后我们看下读任务,它也是在 generateRunner 方法

    //①
    newRunner = LoadUtil.loadPluginRunner(pluginType,
                            this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
    //②
    newRunner.setJobConf(this.taskConfig.getConfiguration(
            CoreConstant.JOB_READER_PARAMETER));
   //③
    pluginCollector = ClassUtil.instantiate(
            taskCollectorClass, AbstractTaskPluginCollector.class,
            configurationthis.taskCommunication,
            PluginType.READER)
;
    //④
    RecordSender recordSender;
    if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
        recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
    } else {
        recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
    }
    
    ((ReaderRunner) newRunner).setRecordSender(recordSender);
    //⑤
    newRunner.setTaskPluginCollector(pluginCollector);

代码①会加载 ReaderRunner 任务。
代码②会将读配置作为参数写入。
代码③和⑤实际上是加载 taskPlugin、用于处理脏数据和 job/task 通信数据。
代码④值得注意,这里会判断使用哪种 Exchange. Exchange 可以理解为外部与 channel 交互的中间件.BufferedRecordTransformerExchanger 指的是可以根据特定的格式转换。BufferedRecordExchanger 仅仅是根据 Datax 的格式进行传输。

任务读写开始(MySQl 为例)

当读写任务完成初始化,接下来就是其 start 方法里面的内容了。先介绍读任务。读任务定义于 ReaderRunner,为了更加灵活,在 ReaderRunner 内也是使用插件机制的.如果我们需要拓展,只拓展 Reader.Task 即可.

    Reader.Task taskReader = (Reader.Task) this.getPlugin();

然后重点关注 Reader.Task 的 startRead 方法 调用 startRead 方法需要传入 Exchange

    taskReader.startRead(recordSender);
   recordSender.terminate();

调用 startRead 方法需要传入 Exchange,接下来就是具体的数据源实现了。具体实现类 CommonRdbmsReader

首先是获取 QUERY_SQL、username、password 等参数,进行连接

    String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
    String table = readerSliceConfig.getString(Key.TABLE);
    Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);

然后执行 QUERY_SQL 获取数据

    rs = DBUtil.query(conn, querySql, fetchSize);

然后循环查询结果,开始写入

    while (rs.next()) {
        this.transportOneRecord(recordSender, rs,
         metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
    }

transportOneRecord 实际上是通过 Exchanger 进行交互。

    protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
        ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
        TaskPluginCollector taskPluginCollector)
 
{
        Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
        recordSender.sendToWriter(record);
        return record;
    }

接下来是写任务。写任务的实现在 commonRdbmsWriterTask#startWrite。写任务首先要获取连接,然后处理连接的 session 问题,最后

    Connection connection = DBUtil.getConnection(this.dataBaseType,
                    this.jdbcUrl, username, password);
    DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
                    this.dataBaseType, BASIC_MESSAGE);

然后开始写数据。写数据是从 Exchanger 里面获取的。从 Exchanger 获取数据循环,然后检查列数是否满足

    //使用 buffer 缓存, batchSize 是控制每次发送的数量
    List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
    
    while ((record = recordReceiver.getFromReader()) != null) {
        if (record.getColumnNumber() != this.columnNumber) {
            // throw error... 
        }
    }

如果符合条件的,即可添加进缓存。最后刷进数据库里

    doBatchInsert(connection, writeBuffer);
    writeBuffer.clear();
    bufferBytes = 0;

Exchanger 交互

Exchanger 实现了 RecordSender 和 RecordReceiver,同时它有两个实现类 BufferedRecordExchanger 以及 BufferedRecordTransformerExchanger。

RecordSender.java

public interface RecordSender {
    public Record createRecord();
    public void sendToWriter(Record record);
    public void flush();
    public void terminate();
    public void shutdown();
}

RecordReceiver.java

public interface RecordReceiver {
    public Record getFromReader();
    public void shutdown();
}

以 BufferedRecordExchanger 为例,我们挑几个方法讲下。

方法名 作用
sendToWriter 将数据写入 channel
flush 刷新缓冲区, 也就是将缓冲区的数据写入 channel
terminate 马上将缓冲区数据刷入 channel
getFromReader 从 channel 获取数据

sendToWriter 方法是负责将记录放进 channel。首先会校验记录的大小是否超出限制

    if (record.getMemorySize() > this.byteCapacity) {
        this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s"this.byteCapacity)));
        return;
    }

然后判断是否 channel 是否已经满了。如果满了就刷新进行发送

    boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
    if (isFull) {
        flush();
    }

否则加入队列

    this.buffer.add(record);
    this.bufferIndex++;
    memoryBytes.addAndGet(record.getMemorySize());

flush 方法也很简单,直接添加到 channel 即可。

    this.channel.pushAll(this.buffer);
    this.buffer.clear();

terminate 方法是直接将缓冲区的数据刷入 channel。

    flush();
    this.channel.pushTerminate(TerminateRecord.get());

getFromReader 方法是给 writer 获取数据,进行下一步操作。首先是排查队列是否为空。如果为空,然后调用方法将数据批量写入 writer 的缓冲区

    boolean isEmpty = (this.bufferIndex >= this.buffer.size());
    if (isEmpty) {
       receive();
    }

如果不为空,根据索引读取,返回缓冲区中的一条。

    Record record = this.buffer.get(this.bufferIndex++);
    if (record instanceof TerminateRecord) {
            record = null;
    }
    return record;

文末

大致上 Datax 的读写任务都是这个流程,灵活在于它自身提供了很多数据源的实现案例。同时也提供插件式的 reader 和 writer,这样可以做更多的扩展和业务针对性的适配

end