通过OGG实现mysql到kafka实时数据采集
通过OGG实现mysql到kafka实时数据采集,准备软件:
centos 6.7虚拟机两台
源端 192.168.5.201 安装mysql OGG客户端
mysql-8.0.23-1.el7.x86_64.rpm-bundle.tar
191003_ggs_Linux_x64_MySQL_64bit.zip
目标端 192.168.5.202 安装kafka OOG客户端
jdk-8u211-linux-x64.tar.gz
kafka_2.13-2.7.0.tgz
zookeeper-3.4.14.tar.gz
OGG_BigData_Linux_x64_19.1.0.0.5.zip
一 源端配置
1 mysql安装mysql依赖,并解压安装
[root@localhost mysql]# yum -y install autoconf perl.x86_64 perl-devel.x86_64 perl-JSON.noarch net-tools numactl libaio* openssl-devel perl-Test*[root@localhost mysql]#[root@localhost mysql]# lsmysql-8.0.23-1.el7.x86_64.rpm-bundle.tar mysql-community-embedded-compat-8.0.23-1.el7.x86_64.rpmmysql-community-client-8.0.23-1.el7.x86_64.rpm mysql-community-libs-8.0.23-1.el7.x86_64.rpmmysql-community-client-plugins-8.0.23-1.el7.x86_64.rpm mysql-community-libs-compat-8.0.23-1.el7.x86_64.rpmmysql-community-common-8.0.23-1.el7.x86_64.rpm mysql-community-server-8.0.23-1.el7.x86_64.rpmmysql-community-devel-8.0.23-1.el7.x86_64.rpm mysql-community-test-8.0.23-1.el7.x86_64.rpm[root@localhost mysql]#[root@localhost mysql]#[root@localhost mysql]#[root@localhost mysql]# rpm -ivh *.rpmwarning: mysql-community-client-8.0.23-1.el7.x86_64.rpm: Header V3 DSA/SHA1 Signature, key ID 5072e1f5: NOKEYPreparing... ################################# [100%]Updating / installing...1:mysql-community-common-8.0.23-1.e################################# [ 11%]2:mysql-community-client-plugins-8.################################# [ 22%]3:mysql-community-libs-8.0.23-1.el7################################# [ 33%]4:mysql-community-client-8.0.23-1.e################################# [ 44%]5:mysql-community-server-8.0.23-1.e################################# [ 56%]6:mysql-community-test-8.0.23-1.el7################################# [ 67%]7:mysql-community-devel-8.0.23-1.el################################# [ 78%]8:mysql-community-libs-compat-8.0.2################################# [ 89%]9:mysql-community-embedded-compat-8################################# [100%][root@localhost mysql]#
2 登录并修改密码
[root@localhost mysql]# systemctl start mysqld[root@localhost mysql]# grep password /var/log/mysqld.log2021-04-03T20:51:17.333794Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: vy43:UR8T22b[root@localhost mysql]#mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'Carl@123123' PASSWORD EXPIRE NEVER;Query OK, 0 rows affected (0.01 sec)mysql> ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'Carl@123123';Query OK, 0 rows affected (0.00 sec)mysql> flush privileges;Query OK, 0 rows affected (0.01 sec)
3 创建测试用表
mysql>mysql> create database test;Query OK, 1 row affected (0.01 sec)mysql> use test;Database changedmysql> CREATE TABLE test (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(60) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;Query OK, 0 rows affected, 2 warnings (0.03 sec)mysql>mysql> desc test;+-------+-------------+------+-----+---------+----------------+| Field | Type | Null | Key | Default | Extra |+-------+-------------+------+-----+---------+----------------+| id | int | NO | PRI | NULL | auto_increment || name | varchar(60) | YES | | NULL | |+-------+-------------+------+-----+---------+----------------+2 rows in set (0.00 sec)
4 查看二进制日志是否开启及相关配置文件,默认开启
mysql>mysql> show variables like '%log_bin%';+---------------------------------+-----------------------------+| Variable_name | Value |+---------------------------------+-----------------------------+| log_bin | ON || log_bin_basename | /var/lib/mysql/binlog || log_bin_index | /var/lib/mysql/binlog.index || log_bin_trust_function_creators | OFF || log_bin_use_v1_row_events | OFF || sql_log_bin | ON |+---------------------------------+-----------------------------+6 rows in set (0.00 sec)mysql>mysql> show variables like '%sock%';+-----------------------------------------+-----------------------------+| Variable_name | Value |+-----------------------------------------+-----------------------------+| mysqlx_socket | /var/run/mysqld/mysqlx.sock || performance_schema_max_socket_classes | 10 || performance_schema_max_socket_instances | -1 || socket | /var/lib/mysql/mysql.sock |+-----------------------------------------+-----------------------------+4 rows in set (0.00 sec)
5 解压OGG软件,配置相关进程
[][][][][][][][][][]Archive: 191003_ggs_Linux_x64_MySQL_64bit.zipinflating: ggs_Linux_x64_MySQL_64bit.tarinflating: OGG-19.1.0.0-README.txtinflating: OGG_WinUnix_Rel_Notes_19.1.0.0.3.pdf[]......
6 创建目录
[root@mysql ogg]# ./ggsciOracle GoldenGate Command Interpreter for MySQLVersion 19.1.0.0.3 OGGCORE_19.1.0.0.0_PLATFORMS_190907.0144Linux, x64, 64bit (optimized), MySQL Enterprise on Sep 7 2019 08:41:32Operating system character set identified as UTF-8.Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.GGSCI (mysql) 1> create subdirsCreating subdirectories under current directory /opt/oggParameter file /opt/ogg/dirprm: created.Report file /opt/ogg/dirrpt: created.Checkpoint file /opt/ogg/dirchk: created.Process status files /opt/ogg/dirpcs: created.SQL script files /opt/ogg/dirsql: created.Database definitions files /opt/ogg/dirdef: created.Extract data files /opt/ogg/dirdat: created.Temporary files /opt/ogg/dirtmp: created.Credential store files /opt/ogg/dircrd: created.Masterkey wallet files /opt/ogg/dirwlt: created.Dump files /opt/ogg/dirdmp: created.
7 添加抽取的表
GGSCI (mysql) 2> dblogin sourcedb [email protected]:3306,userid root,password Carl@123123Successfully logged into database.GGSCI (mysql DBLOGIN as root) 3> add trandata test.testERROR: Invalid command.(此处有报错,不影响结果)
8 配置管理进程
GGSCI (mysql DBLOGIN as root) 4> edit param mgrPORT 7809DYNAMICPORTLIST 7810-7909AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10
9 配置抽取进程
GGSCI (mysql DBLOGIN as root) 5> edit param ext_1EXTRACT ext_1="/var/lib/mysql/mysql.sock")sourcedb [email protected]:3306, userid root,password Carl@123123tranlogoptions altlogdest "/var/lib/mysql/binlog.index"REPORTCOUNT EVERY 10 SECONDS, RATEEXTTRAIL ./dirdat/01TABLE test.test;GGSCI (mysql DBLOGIN as root) 6> ADD EXTRACT ext_1, TRANLOG, BEGIN NOWEXTRACT added.GGSCI (mysql DBLOGIN as root) 7> ADD EXTTRAIL ./dirdat/01, EXTRACT ext_1, MEGABYTES 200EXTTRAIL added.
10 配置投递进程
GGSCI (mysql DBLOGIN as root) 8> edit param dpe_1EXTRACT dpe_1PASSTHRUDYNAMICRESOLUTIONRMTHOST 192.168.5.202, MGRPORT 7809RMTTRAIL ./dirdat/ca/01TABLE test.*;GGSCI (mysql DBLOGIN as root) 9> ADD EXTRACT dpe_1, EXTTRAILSOURCE ./dirdat/01EXTRACT added.GGSCI (mysql DBLOGIN as root) 10> ADD RMTTRAIL ./dirdat/ca/01, EXTRACT dpe_1, MEGABYTES 200RMTTRAIL added.GGSCI (mysql DBLOGIN as root) 11> [root@mysql ogg]#ogg]#
二 目标端配置
1 解压安装kafka软件
[][][][][][][]
2配置环境变量
[root@localhost ~]# vim /etc/profile#JAVA_HOMEexport JAVA_HOME=/opt/bdata/jdk1.8.0_211export PATH=$PATH:$JAVA_HOME/bin#KAFKA_HOMEexport KAFKA_HOME=/opt/bdata/kafka_2.13-2.7.0export PATH=$PATH:$KAFKA_HOME/bin#ZK_HOMEexport ZK_HOME=/opt/bdata/zookeeper-3.4.14export PATH=$PATH:$ZK_HOME/binexport OGG_HOME=/opt/oggexport LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/libexport PATH=$OGG_HOME:$PATH[root@localhost ~]# source /etc/profile
3 ZK默认配置文件即可
[][][]ZooKeeper JMX enabled by defaultUsing config: /opt/bdata/zookeeper-3.4.14/bin/../conf/zoo.cfgStarting zookeeper ... STARTED[]
4 kafka需要更改配置文件
[] vim /opt/bdata/kafka_2.13-2.7.0/config/server.properties31行 listeners=PLAINTEXT://192.168.5.99:909236行 advertised.listeners=PLAINTEXT://192.168.5.99:9092123行 zookeeper.connect=192.168.5.99:2181[]
5 解压OGG软件,配置相关进程
[][][]OGG_BigData_Linux_x64_19.1.0.0.5.zip[]Archive: OGG_BigData_Linux_x64_19.1.0.0.5.zipinflating: OGGBD-19.1.0.0-README.txtinflating: OGG_BigData_19.1.0.0.5_Release_Notes.pdfinflating: OGG_BigData_Linux_x64_19.1.0.0.5.tar[]AdapterExamples/...[][][][][]
6 创建目录
[root@kafka ogg]# ./ggsciOracle GoldenGate for Big DataVersion 19.1.0.0.5 (Build 007)Oracle GoldenGate Command InterpreterVersion 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58Operating system character set identified as UTF-8.Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.GGSCI (kafka) 1> create subdirsCreating subdirectories under current directory /opt/oggParameter file /opt/ogg/dirprm: created.Report file /opt/ogg/dirrpt: created.Checkpoint file /opt/ogg/dirchk: created.Process status files /opt/ogg/dirpcs: created.SQL script files /opt/ogg/dirsql: created.Database definitions files /opt/ogg/dirdef: created.Extract data files /opt/ogg/dirdat: created.Temporary files /opt/ogg/dirtmp: created.Credential store files /opt/ogg/dircrd: created.Masterkey wallet files /opt/ogg/dirwlt: created.Dump files /opt/ogg/dirdmp: created.
7 配置管理进程
GGSCI (kafka) 2> edit param mgrPORT 7809DYNAMICPORTLIST 7810-7909AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10GGSCI (kafka) 3>
8 配置rep_1
GGSCI (kafka) 3> edit param rep_1REPLICAT rep_1TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsDDL INCLUDE ALLREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP test.*, TARGET test.*;GGSCI (kafka) 4> add replicat rep_1, exttrail ./dirdat/ca/01REPLICAT added.
9 配置kafka.props
GGSCI (kafka) 5>[root@kafka dirprm]# vim kafka.propsgg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.format=jsongg.handler.kafkahandler.format.includePrimaryKeys=truegg.handler.kafkahandler.BlockingSend=falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode=opgg.handler.kafkahandler.topicMappingTemplate=testgg.classpath=dirprm/:/opt/bdata/kafka_2.13-2.7.0/libs/*:/opt/ogg/:/opt/ogg/lib/*gg.log=log4jgg.log.level=INFOgg.report.time=30sec[root@kafka dirprm]#
10 配置 custom_kafka_producer.properties
[]bootstrap.servers=192.168.5.202:9092acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbatch.size=102400linger.ms=10000[]
三 启动服务
源端mgr ---> 目标端mgr ---> 源端ext_1---> 源端dpe_1---> 目标rep_1
start mgr ---> start mgr ---> start ext_1---> start dpe_1---> start rep_1
1 启动完成,源端
GGSCI (mysql) 30> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNINGEXTRACT RUNNING DPE_1 00:00:00 00:00:06EXTRACT RUNNING EXT_1 00:00:00 00:00:08GGSCI (mysql) 31>
2 启动完成,目标端
GGSCI (kafka) 12> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNINGREPLICAT RUNNING REP_1 00:00:00 00:00:06GGSCI (kafka) 13>
3 mysql写入数据进行测试
use testReading table information for completion of table and column namesYou can turn off this feature to get a quicker startup with -ADatabase changedinsert into test values(1,'carl');Query OK, 1 row affected (0.26 sec)mysql> insert into test values(2,'carl');Query OK, 1 row affected (0.13 sec)mysql> insert into test values(3,'good job!');Query OK, 1 row affected (0.02 sec)
4 查看topic及消费情况
[]test[]{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:37:00.280750","current_ts":"2021-04-04T06:37:08.134000","pos":"00000000000000001691","primary_keys":["ID"],"after":{"ID":1,"NAME":"carl"}}{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:38:49.280773","current_ts":"2021-04-04T06:38:56.395000","pos":"00000000000000001855","primary_keys":["ID"],"after":{"ID":2,"NAME":"carl"}}{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:39:04.281151","current_ts":"2021-04-04T06:39:10.410000","pos":"00000000000000002015","primary_keys":["ID"],"after":{"ID":3,"NAME":"good job!"}}
5 测试完成
