vlambda博客
学习文章列表

DataX 秀起来: 从MySQL同步数据到Doris

关注我,和我一起学习

前言

开发的过程有个任务,利用DataX从MySQL同步数据到Doris,此篇从技术验证与简单使用层面展开学习。主要以下几个方面进行学习。

  1. 编译 doriswriter plugin
  2. mysqlreader 写入数据到Doris
  3. 性能测试
  4. Bug 记录

编译 doriswriter

doriswriter 插件

https://github.com/apache/incubator-doris/tree/master/extension/DataX

步骤(按需修改源代码)

  1. 从github上拉取源码(或者直接在上面地址下载包)
git clone https://github.com/apache/incubator-doris.git

不过执行 init 即可 

2. 运行 init-env.sh

主要做了下面几件事,减少了繁杂的操作。

(1)将 DataX 代码库 clone 到本地。

(2)将 doriswriter/ 目录软链到 DataX/doriswriter 目录。

(3)在 DataX/pom.xml 文件中添加 doriswriter 模块。

(4)将 DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13(因为有bug)

  1. 编译 doriswriter

(1)命令

mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests
../target/datax/datax/
  1. 把编译完的包放到plugin下即可
../datax/plugin/
  1. 具体可看Doris官网

http://doris.incubator.apache.org/zh-CN/extending-doris/datax.html

MySQL同步数据到Doris

创建Doris表

CREATE TABLE `mars_micro_user_events` (
  `id` bigint,
  `user_id` bigint DEFAULT NULL ,
  `group_type` int DEFAULT NULL,
  `group_id` int DEFAULT NULL,
  `event_type` varchar(45) DEFAULT NULL,
  `event_name` varchar(45) DEFAULT NULL,
  `event_count` int DEFAULT NULL,
  `event_time` bigint DEFAULT NULL,
  `created_time` bigint DEFAULT NULL,
  `updated_time` bigint DEFAULT NULL
) ENGINE=OLAP
DUPLICATE KEY(id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
);

DataX 配置JSON文件 (mysqlToDoris.json)

{
  "core":{
    "transport": {
      "channel": {
        "speed": {
          "byte": 104857600,
          "record": 200000
        }
      }
    }
  },
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name""mysqlreader",
                    "parameter": {
                        "username""dev",
                        "password""123456",
                        "connection": [
                            {
                            
                                "jdbcUrl": [
                                     "jdbc:mysql://mysql.xxx.cn:3306/event_db"
                                ],
                                "querySql": [
                                    "select id,user_id,group_type,group_id,event_type,event_name,event_time,created_time,updated_time from eventc_db.user_events where event_time>=${StartTime} and event_time<${EndTime};"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name""doriswriter",
                    "parameter": {
                        "username""root",
                        "password""",
                        "database""event_db",
                        "table""events",
                        "column": [ "id",  "group_type","group_id","event_type","event_name","event_time","created_time","updated_time"],
                        "preSql": [],
                        "postSql": [], 
                        "jdbcUrl""jdbc:mysql://cdh3:9030/",
                        "feLoadUrl": ["cdh3:8030"],
                        "beLoadUrl": ["cdh1:8044""cdh2:8044""cdh3:8044"],
                        "loadProps": {
                        },
                        "maxBatchRows" : 200000,
                        "maxBatchByteSize" : 104857600,
                        "lineDelimiter""\n"
                    }
                }
            }
        ]
    }
}

core与job参数介绍

配置 说明
core.transport.channel.speed.byte 单个channel容纳最多的字节数
core.transport.channel.speed.record 单个channel容纳最多的record数
job.setting.speed.channel 该job所需要的channel的个数
job.setting.speed.byte 该job最大的流量
job.setting.speed.record 该job最大的record流量

doriswriter 参数介绍

配置 说明
username 数据库的用户名
password 数据库的密码
database 表的数据库名称
table 表的表名称
jdbcUrl 数据库的 JDBC 连接信息
loadUrl FE的地址用于Stream Load
feLoadUrl FE的地址
beLoadUrl BE的地址
column 需要写入数据的字段
preSql 写入数据到目的表前,会先执行这里的标准语句
postSql 写入数据到目的表后,会执行这里的标准语句
loadProps Stream Load 的请求参数
maxBatchRows 每批次导入数据的最大行数
maxBatchByteSize 每批次导入数据的最大数据量
labelPrefix 每批次导入任务的 label 前缀
lineDelimiter 每批次数据包含多行,,每行的的分隔符
connectTimeout Stream Load单次请求的超时时间, 单位毫秒

执行任务

python /opt/app/datax/bin/datax.py  /opt/app/datax/bin/mysqlToDoris.json -p "-DStartTime=1546272000000 -DEndTime=1650791448000" 

性能测试

mysql events表数据量有 8310077条

Doris FE一台机器,BE三台

CPU 8核,内存64G

测试A

配置
core.transport.channel.speed.byte 1048576
core.transport.channel.speed.record 10000
doriswriter maxBatchByteSize 104857600
doriswriter maxBatchRows 10000
DataX 秀起来: 从MySQL同步数据到Doris

测试B

配置
core.transport.channel.speed.byte 1048576
core.transport.channel.speed.record 100000
doriswriter maxBatchByteSize 104857600
doriswriter maxBatchRows 100000


DataX 秀起来: 从MySQL同步数据到Doris

测试C

配置
core.transport.channel.speed.byte 104857600
core.transport.channel.speed.record 100000
doriswriter maxBatchByteSize 104857600
doriswriter maxBatchRows 100000

DataX 秀起来: 从MySQL同步数据到Doris

测试D

配置
core.transport.channel.speed.byte 104857600
core.transport.channel.speed.record 500000
doriswriter maxBatchByteSize 104857600
doriswriter maxBatchRows 500000

挂了

测试E

配置 说明
core.transport.channel.speed.byte 104857600
core.transport.channel.speed.record 200000
doriswriter maxBatchByteSize 104857600
doriswriter maxBatchRows 200000

DataX 秀起来: 从MySQL同步数据到Doris

对于我用的测试机器,基本上维持高值3M左右

Bug 记录

1. 配置beLoadUrl地址

2. 建表语句

建Doris表的时候没有报错问题,但是执行插入的时候数据进不去。

原来的建表语句

CREATE TABLE events
(
  `id` bigint,
  `user_id` bigint ,
  `group_type` int,
  `group_id` int,
  `event_name` varchar(45),
  `event_count` int,
  `event_time` bigint,
  `created_time` bgint,
  `updated_time` bigint
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id, event_name,group_id) BUCKETS 6;

修改后的

CREATE TABLE mars_micro_user_events
(
  `id` bigint,
  `user_id` bigint ,
  `group_type` int,
  `group_id` int,
  `event_name` varchar(45),
  `event_count` int,
  `event_time` bigint,
  `created_time` bigint,
  `updated_time` bigint
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 6

首先了解DUPLICATE KEY 数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。

分桶列可以是多列,但必须为Key 列。在此建表语句(DUPLICATE KEY)模型下,建表是不会报错的,如果选择的是其他聚合的模型则会直接报错。