资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 系统运维 > 运维 > Linux

flink cdc 同步mongodb的测试

Linux 更新时间: 发布时间: 计算机考试归档 最新发布

flink cdc 同步mongodb的测试

flink cdc 同步mongodb的测试 1.前言 1.1 部署模式

MongDB一共有三种集群搭建方式
Replica Set(副本集)
Sharding (切片)
Mast-Salver(主从)已不推荐使用
三种模式当中,Sharding切片模式最为复杂。Replica Set副本集模式可以理解为是主从的一种升级版,双方互为主从。

1.2 同步方案

根据MongDB的版本同步方案也被分为两种,oplog和change stream,在MongDB 3.6之前都是采用oplog的方式实时同步数据,那么在MongDB 3.6之后都是使用change stream,它是oplog的一个升级版,并且只支持Replica Set副本集和Sharding切片的集群模式,不支持单机模式。
所以接下来就简单介绍下Replica Set副本集的搭建和change stream的使用。

2. mongodb集群部署 2.1 机器准备

选择Centos7.5系统的三台虚拟机进行安装
3台主机分别为mongodb001,mongodb002,mongodb003

2.2集群安装

(1)创建/etc/yum.repos.d/mongodb-org-4.2.repo文件,以便可以使用yum安装

[root@mongodb001 ~]# vim /etc/yum.repos.d/mongodb-org-4.2.repo
[mongodb-org-4.2]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/4.2/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.2.asc

(2)安装MongoDB软件包

[root@mongodb001 ~]# sudo yum install -y mongodb-org-4.2.6 mongodb-org-server-4.2.6 mongodb-org-shell-4.2.6 mongodb-org-mongos-4.2.6 mongodb-org-tools-4.2.6

(3)安装完毕后,会自动创建/var/lib/mogo和/var/logmonogb两个目录,如果没创建需要手动自己创建下,并且要将所属用户和所属组都设置为mongod

(4)查看SELinux模式,如果是enforcing模式则需要额外配置自定义策略,本机处于关闭模式不需要额外配置,自定义策略见官网配置。

链接: link

[root@mongodb001 lib]# sestatus
SELinux status:                 disabled

(5)启动mongodb

[root@mongodb001 lib]# systemctl start mongod

(6)查看状态

[root@mongodb001 lib]# systemctl status mongod

(7)mongodb002,mongodb003也同样进行安装,安装完毕后先将mongodb数据停止

[root@mongodb001 lib]# systemctl stop mongod

(8)配置/etc/hosts,确保主机都能访问

[root@mongodb001 lib]# vim /etc/hosts
172.26.16.34 mongodb001  mongodb001
172.26.16.33 mongodb002  mongodb002	
172.26.16.35 mongodb003  mongodb003
[root@mongodb001 lib]# scp /etc/hosts mongodb002:/etc/
[root@mongodb001 lib]# scp /etc/hosts mongodb003:/etc/

(9)修改各主机mongodb地址配置

[root@mongodb001 ~]# vim /etc/mongod.conf
net:
  port: 27017
  bindIp: 0.0.0.0

replication:
   replSetName: "rs0"
[root@mongodb002 ~]# vim /etc/mongod.conf 
net:
  port: 27017
  bindIp: 0.0.0.0

replication:
   replSetName: "rs0"
[root@mongodb003 ~]# vim /etc/mongod.conf 
net:
  port: 27017
  bindIp: 0.0.0.0

replication:
   replSetName: "rs0"

(10)集群指定副本集和ip绑定

[root@mongodb001 ~]# mongod -f /etc/mongod.conf
[root@mongodb002 ~]# mongod -f /etc/mongod.conf
[root@mongodb003 ~]# mongod -f /etc/mongod.conf
[root@mongodb001 ~]# mongo
> rs.initiate( {
    _id : "rs0",
  members: [
 { _id: 0, host: "mongodb001" },
  { _id: 1, host: "mongodb002" },
     { _id: 2, host: "mongodb003" }
  ]
 })


···
rs0:SECONDARY> exit;
···
绑定成功,集群模式 mongod -f /etc/mongod.conf为启动数据库命令

2.3创建表

(1 ) 连接shell

[root@mongodb001 ~]# mongo

(2) 查看数据库

rs0:PRIMARY> show dbs
admin   0.000GB
config  0.000GB
local   0.000GB

(3) 查看当前正在使用的库

rs0:PRIMARY> db
test

(4) 创建collection

rs0:PRIMARY> db.createCollection("testCollection")
{
	"ok" : 1,
	"$clusterTime" : {
		"clusterTime" : Timestamp(1594274891, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	},
	"operationTime" : Timestamp(1594274891, 1)
}

(5) 查看已有集合

rs0:PRIMARY> show collections
testCollection

(6) 插入测试数据

rs0:PRIMARY> db.testCollection.insert({name:"张三","age":22,sex:"男"})
WriteResult({ "nInserted" : 1 })

(7) 查询表数据

rs0:PRIMARY> db.testCollection.find()
{ "_id" : ObjectId("5f06b4cb41c332467846251d"), "name" : "张三", "age" : 22, "sex" : "男" }

(8) 修改表数据,将张三姓名修改成李四

rs0:PRIMARY> db.testCollection.update({name:"张三"},{$set:{name:"李四"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
rs0:PRIMARY> db.testCollection.find()
{ "_id" : ObjectId("5f06b4cb41c332467846251d"), "name" : "李四", "age" : 22, "sex" : "男" }
2.4添加用户

提示:先进入admin库,然后创建角色

rs0:PRIMARY>use amdin;
rs0:PRIMARY> db.createUser({user:'admin',pwd:'admin',roles:[{role:'readWrite',db:'admin,config,local,test'}]})
Successfully added user: {
	"user" : "admin",
	"roles" : [
		{
			"role" : "readWrite",
			"db" : "admin,config,local,test"
		}
	]
}
3. 利用flink cdc编写程序同步数据
{
        _id : { // 存储元信息
            "_data" :  // resumeToken,用于断点恢复
        },
        "operationType" : "", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate,部分仅支持4.0后的版本,详情见下
        "fullDocument" : {  }, // 修改后的数据,出现在insert, replace, delete, update的事件中
        "ns" : { // namespace
            "db" : "",
            "coll" : "",
            "coll" : " }, // 相当于o2字段。出现在insert, replace, delete, update事件中。正常只包含_id,对于sharded collection,还包括shard key
        "updateDescription" : { // 只在operationType为update的时候出现,相当于是增量的修改,而replace是替换
            "updatedFields" : {  }, // 更新的field的值
            "removedFields" : [ "", ... ] // 删除的field列表
        },
        "clusterTime" : , // 相当于ts字段
        "txnNumber" : , // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
        "lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id
            "id" : ,
            "uid" : 
        }
    }
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/939771.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【flink cdc 同步mongodb的测试】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2