使用Change Stream实时同步MongoDB数据(上)
01
PART
前言
MongDB一共有三种集群搭建方式
Replica Set(副本集)
Sharding (切片)
Mast-Salver(主从)已不推荐使用
三种模式当中,Sharding切片模式最为复杂。Replica Set副本集模式可以理解为是主从的一种升级版,双方互为主从。
根据MongDB的版本同步方案也被分为两种,oplog和change stream,在MongDB 3.6之前都是采用oplog的方式实时同步数据,那么在MongDB 3.6之后都是使用change stream,它是oplog的一个升级版,并且只支持Replica Set副本集和Sharding切片的集群模式,不支持单机模式。
所以接下来就简单介绍下Replica Set副本集的搭建和change stream的使用。
02
PART
集群部署
准备3台red hat系统的机器,包括Red Hat Enterprise Linux,CentOS Linux或Oracle Linux。我们选择Centos7.5系统进行安装3台主机。分别为mongodb001,mongodb002,mongodb003
(1)创建/etc/yum.repos.d/mongodb-org-4.2.repo文件,以便可以使用yum安装
[root@mongodb001 ~]# vim /etc/yum.repos.d/mongodb-org-4.2.repo
[mongodb-org-4.2]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/4.2/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.2.asc
(2)安装MongoDB软件包
[root@mongodb001 ~]# sudo yum install -y mongodb-org-4.2.6 mongodb-org-server-4.2.6 mongodb-org-shell-4.2.6 mongodb-org-mongos-4.2.6 mongodb-org-tools-4.2.6
(3)安装完毕后,会自动创建/var/lib/mongo和/var/log/mongodb两个目录,如果没创建需要手动自己创建下,并且要将所属用户和所属组都设置为mongod
(4)查看SELinux模式,如果是enforcing模式则需要额外配置自定义策略,本机处于关闭模式不需要额外配置,自定义策略见官网配置。
[ ]
SELinux status: disabled
(5)启动mongodb
[ ]
(6)查看状态
[ ]
(7)mongodb002,mongodb003也同样进行安装,安装完毕后先将mongodb数据库停止
(8)配置/etc/hosts,确保主机都能访问
[ ]
172.26.16.34 mongodb001 mongodb001
172.26.16.33 mongodb002 mongodb002
172.26.16.35 mongodb003 mongodb003
[ ]
[ ]
~]# vim /etc/mongod.conf
net:
port: 27017
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
~]# vim /etc/mongod.conf
net:
port: 27017
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
~]# vim /etc/mongod.conf
net:
port: 27017
bindIp: 0.0.0.0
replication:
replSetName: "rs0"
[ ]
[ ]
[ ]
[ ]
> rs.initiate( {
_id : "rs0",
members: [
{ _id: 0, host: "mongodb001" },
{ _id: 1, host: "mongodb002" },
{ _id: 2, host: "mongodb003" }
]
})
(11)绑定成功,退出。mongod -f 为启动数据库命令
rs0:SECONDARY> exit;
03
PART
创建表
(1)连接shell
[ ]
(2)查看数据库
rs0:PRIMARY> show dbs
admin 0.000GB
config 0.000GB
local 0.000GB
(3)查看当前正在使用的库
rs0:PRIMARY> db
test
(4)创建collection
rs0:PRIMARY> db.createCollection("testCollection")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1594274891, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1594274891, 1)
}
(5)查看已有集合
rs0:PRIMARY> show collections
testCollection
(6)插入测试数据
rs0:PRIMARY> db.testCollection.insert({name:"张三","age":22,sex:"男"})
WriteResult({ "nInserted" : 1 })
(7)查询表数据
rs0:PRIMARY> db.testCollection.find()
{ "_id" : ObjectId("5f06b4cb41c332467846251d"), "name" : "张三", "age" : 22, "sex" : "男" }
(8)修改表数据,将张三姓名修改成李四
rs0:PRIMARY> db.testCollection.update({name:"张三"},{$set:{name:"李四"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
rs0:PRIMARY> db.testCollection.find()
{ "_id" : ObjectId("5f06b4cb41c332467846251d"), "name" : "李四", "age" : 22, "sex" : "男" }
(9)添加用户
rs0:PRIMARY> db.createUser({user:'admin',pwd:'admin',roles:[{role:'readWrite',db:'admin,config,local,test'}]})
Successfully added user: {
"user" : "admin",
"roles" : [
{
"role" : "readWrite",
"db" : "admin,config,local,test"
}
]
}
完