通过OGG实现mysql到kafka实时数据采集
通过OGG实现mysql到kafka实时数据采集,准备软件:
centos 6.7虚拟机两台
源端 192.168.5.201 安装mysql OGG客户端
mysql-8.0.23-1.el7.x86_64.rpm-bundle.tar
191003_ggs_Linux_x64_MySQL_64bit.zip
目标端 192.168.5.202 安装kafka OOG客户端
jdk-8u211-linux-x64.tar.gz
kafka_2.13-2.7.0.tgz
zookeeper-3.4.14.tar.gz
OGG_BigData_Linux_x64_19.1.0.0.5.zip
一 源端配置
1 mysql安装mysql依赖,并解压安装
[root@localhost mysql]# yum -y install autoconf perl.x86_64 perl-devel.x86_64 perl-JSON.noarch net-tools numactl libaio* openssl-devel perl-Test*
[root@localhost mysql]#
[root@localhost mysql]# ls
mysql-8.0.23-1.el7.x86_64.rpm-bundle.tar mysql-community-embedded-compat-8.0.23-1.el7.x86_64.rpm
mysql-community-client-8.0.23-1.el7.x86_64.rpm mysql-community-libs-8.0.23-1.el7.x86_64.rpm
mysql-community-client-plugins-8.0.23-1.el7.x86_64.rpm mysql-community-libs-compat-8.0.23-1.el7.x86_64.rpm
mysql-community-common-8.0.23-1.el7.x86_64.rpm mysql-community-server-8.0.23-1.el7.x86_64.rpm
mysql-community-devel-8.0.23-1.el7.x86_64.rpm mysql-community-test-8.0.23-1.el7.x86_64.rpm
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]# rpm -ivh *.rpm
warning: mysql-community-client-8.0.23-1.el7.x86_64.rpm: Header V3 DSA/SHA1 Signature, key ID 5072e1f5: NOKEY
Preparing... ################################# [100%]
Updating / installing...
1:mysql-community-common-8.0.23-1.e################################# [ 11%]
2:mysql-community-client-plugins-8.################################# [ 22%]
3:mysql-community-libs-8.0.23-1.el7################################# [ 33%]
4:mysql-community-client-8.0.23-1.e################################# [ 44%]
5:mysql-community-server-8.0.23-1.e################################# [ 56%]
6:mysql-community-test-8.0.23-1.el7################################# [ 67%]
7:mysql-community-devel-8.0.23-1.el################################# [ 78%]
8:mysql-community-libs-compat-8.0.2################################# [ 89%]
9:mysql-community-embedded-compat-8################################# [100%]
[root@localhost mysql]#
2 登录并修改密码
[root@localhost mysql]# systemctl start mysqld
[root@localhost mysql]# grep password /var/log/mysqld.log
2021-04-03T20:51:17.333794Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: vy43:UR8T22b
[root@localhost mysql]#
mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'Carl@123123' PASSWORD EXPIRE NEVER;
Query OK, 0 rows affected (0.01 sec)
mysql> ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'Carl@123123';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.01 sec)
3 创建测试用表
mysql>
mysql> create database test;
Query OK, 1 row affected (0.01 sec)
mysql> use test;
Database changed
mysql> CREATE TABLE test (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(60) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected, 2 warnings (0.03 sec)
mysql>
mysql> desc test;
+-------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| name | varchar(60) | YES | | NULL | |
+-------+-------------+------+-----+---------+----------------+
2 rows in set (0.00 sec)
4 查看二进制日志是否开启及相关配置文件,默认开启
mysql>
mysql> show variables like '%log_bin%';
+---------------------------------+-----------------------------+
| Variable_name | Value |
+---------------------------------+-----------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/binlog |
| log_bin_index | /var/lib/mysql/binlog.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-----------------------------+
6 rows in set (0.00 sec)
mysql>
mysql> show variables like '%sock%';
+-----------------------------------------+-----------------------------+
| Variable_name | Value |
+-----------------------------------------+-----------------------------+
| mysqlx_socket | /var/run/mysqld/mysqlx.sock |
| performance_schema_max_socket_classes | 10 |
| performance_schema_max_socket_instances | -1 |
| socket | /var/lib/mysql/mysql.sock |
+-----------------------------------------+-----------------------------+
4 rows in set (0.00 sec)
5 解压OGG软件,配置相关进程
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
Archive: 191003_ggs_Linux_x64_MySQL_64bit.zip
inflating: ggs_Linux_x64_MySQL_64bit.tar
inflating: OGG-19.1.0.0-README.txt
inflating: OGG_WinUnix_Rel_Notes_19.1.0.0.3.pdf
[ ]
...
...
6 创建目录
[root@mysql ogg]# ./ggsci
Oracle GoldenGate Command Interpreter for MySQL
Version 19.1.0.0.3 OGGCORE_19.1.0.0.0_PLATFORMS_190907.0144
Linux, x64, 64bit (optimized), MySQL Enterprise on Sep 7 2019 08:41:32
Operating system character set identified as UTF-8.
Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.
GGSCI (mysql) 1> create subdirs
Creating subdirectories under current directory /opt/ogg
Parameter file /opt/ogg/dirprm: created.
Report file /opt/ogg/dirrpt: created.
Checkpoint file /opt/ogg/dirchk: created.
Process status files /opt/ogg/dirpcs: created.
SQL script files /opt/ogg/dirsql: created.
Database definitions files /opt/ogg/dirdef: created.
Extract data files /opt/ogg/dirdat: created.
Temporary files /opt/ogg/dirtmp: created.
Credential store files /opt/ogg/dircrd: created.
Masterkey wallet files /opt/ogg/dirwlt: created.
Dump files /opt/ogg/dirdmp: created.
7 添加抽取的表
GGSCI (mysql) 2> dblogin sourcedb [email protected]:3306,userid root,password Carl@123123
Successfully logged into database.
GGSCI (mysql DBLOGIN as root) 3> add trandata test.test
ERROR: Invalid command.(此处有报错,不影响结果)
8 配置管理进程
GGSCI (mysql DBLOGIN as root) 4> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10
9 配置抽取进程
GGSCI (mysql DBLOGIN as root) 5> edit param ext_1
EXTRACT ext_1
"/var/lib/mysql/mysql.sock") =
sourcedb [email protected]:3306, userid root,password Carl@123123
tranlogoptions altlogdest "/var/lib/mysql/binlog.index"
REPORTCOUNT EVERY 10 SECONDS, RATE
EXTTRAIL ./dirdat/01
TABLE test.test;
GGSCI (mysql DBLOGIN as root) 6> ADD EXTRACT ext_1, TRANLOG, BEGIN NOW
EXTRACT added.
GGSCI (mysql DBLOGIN as root) 7> ADD EXTTRAIL ./dirdat/01, EXTRACT ext_1, MEGABYTES 200
EXTTRAIL added.
10 配置投递进程
GGSCI (mysql DBLOGIN as root) 8> edit param dpe_1
EXTRACT dpe_1
PASSTHRU
DYNAMICRESOLUTION
RMTHOST 192.168.5.202, MGRPORT 7809
RMTTRAIL ./dirdat/ca/01
TABLE test.*;
GGSCI (mysql DBLOGIN as root) 9> ADD EXTRACT dpe_1, EXTTRAILSOURCE ./dirdat/01
EXTRACT added.
GGSCI (mysql DBLOGIN as root) 10> ADD RMTTRAIL ./dirdat/ca/01, EXTRACT dpe_1, MEGABYTES 200
RMTTRAIL added.
GGSCI (mysql DBLOGIN as root) 11> [root@mysql ogg]#
ogg]#
二 目标端配置
1 解压安装kafka软件
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
[ ]
2配置环境变量
[root@localhost ~]# vim /etc/profile
#JAVA_HOME
export JAVA_HOME=/opt/bdata/jdk1.8.0_211
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/bdata/kafka_2.13-2.7.0
export PATH=$PATH:$KAFKA_HOME/bin
#ZK_HOME
export ZK_HOME=/opt/bdata/zookeeper-3.4.14
export PATH=$PATH:$ZK_HOME/bin
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
[root@localhost ~]# source /etc/profile
3 ZK默认配置文件即可
[ ]
[ ]
[ ]
ZooKeeper JMX enabled by default
Using config: /opt/bdata/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[ ]
4 kafka需要更改配置文件
[.13-2.7.0/config/server.properties ] vim /opt/bdata/kafka_2
31行 listeners=PLAINTEXT://192.168.5.99:9092
36行 advertised.listeners=PLAINTEXT://192.168.5.99:9092
123行 zookeeper.connect=192.168.5.99:2181
[ ]
5 解压OGG软件,配置相关进程
[ ]
[ ]
[ ]
OGG_BigData_Linux_x64_19.1.0.0.5.zip
[ ]
Archive: OGG_BigData_Linux_x64_19.1.0.0.5.zip
inflating: OGGBD-19.1.0.0-README.txt
inflating: OGG_BigData_19.1.0.0.5_Release_Notes.pdf
inflating: OGG_BigData_Linux_x64_19.1.0.0.5.tar
[ ]
AdapterExamples/
...
[ ]
[ ]
[ ]
[ ]
[ ]
6 创建目录
[root@kafka ogg]# ./ggsci
Oracle GoldenGate for Big Data
Version 19.1.0.0.5 (Build 007)
Oracle GoldenGate Command Interpreter
Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58
Operating system character set identified as UTF-8.
Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.
GGSCI (kafka) 1> create subdirs
Creating subdirectories under current directory /opt/ogg
Parameter file /opt/ogg/dirprm: created.
Report file /opt/ogg/dirrpt: created.
Checkpoint file /opt/ogg/dirchk: created.
Process status files /opt/ogg/dirpcs: created.
SQL script files /opt/ogg/dirsql: created.
Database definitions files /opt/ogg/dirdef: created.
Extract data files /opt/ogg/dirdat: created.
Temporary files /opt/ogg/dirtmp: created.
Credential store files /opt/ogg/dircrd: created.
Masterkey wallet files /opt/ogg/dirwlt: created.
Dump files /opt/ogg/dirdmp: created.
7 配置管理进程
GGSCI (kafka) 2> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10
GGSCI (kafka) 3>
8 配置rep_1
GGSCI (kafka) 3> edit param rep_1
REPLICAT rep_1
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
DDL INCLUDE ALL
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test.*, TARGET test.*;
GGSCI (kafka) 4> add replicat rep_1, exttrail ./dirdat/ca/01
REPLICAT added.
9 配置kafka.props
GGSCI (kafka) 5>
[root@kafka dirprm]# vim kafka.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.BlockingSend=false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.topicMappingTemplate=test
gg.classpath=dirprm/:/opt/bdata/kafka_2.13-2.7.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
[root@kafka dirprm]#
10 配置 custom_kafka_producer.properties
[ ]
bootstrap.servers=192.168.5.202:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
[ ]
三 启动服务
源端mgr ---> 目标端mgr ---> 源端ext_1---> 源端dpe_1---> 目标rep_1
start mgr ---> start mgr ---> start ext_1---> start dpe_1---> start rep_1
1 启动完成,源端
GGSCI (mysql) 30> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING DPE_1 00:00:00 00:00:06
EXTRACT RUNNING EXT_1 00:00:00 00:00:08
GGSCI (mysql) 31>
2 启动完成,目标端
GGSCI (kafka) 12> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REP_1 00:00:00 00:00:06
GGSCI (kafka) 13>
3 mysql写入数据进行测试
use test
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
insert into test values(1,'carl');
Query OK, 1 row affected (0.26 sec)
mysql> insert into test values(2,'carl');
Query OK, 1 row affected (0.13 sec)
mysql> insert into test values(3,'good job!');
Query OK, 1 row affected (0.02 sec)
4 查看topic及消费情况
[ ]
test
[ ]
{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:37:00.280750","current_ts":"2021-04-04T06:37:08.134000","pos":"00000000000000001691","primary_keys":["ID"],"after":{"ID":1,"NAME":"carl"}}
{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:38:49.280773","current_ts":"2021-04-04T06:38:56.395000","pos":"00000000000000001855","primary_keys":["ID"],"after":{"ID":2,"NAME":"carl"}}
{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:39:04.281151","current_ts":"2021-04-04T06:39:10.410000","pos":"00000000000000002015","primary_keys":["ID"],"after":{"ID":3,"NAME":"good job!"}}
5 测试完成