开源工具MongoShake

mongoshake 开源工具地址:https://github.com/alibaba/MongoShake
MongoShake 文档地址:https://github.com/alibaba/MongoShake/wiki/oplog%E5%90%84%E5%AD%97%E6%AE%B5%E7%9A%84%E8%A7%A3%E6%9E%90
工具总结:
MongoShake 同步全量数据只支持 Mongodb 到Mongodb的数据同步
MongoShake 增量同步支持 kafka、rpc、tcp、file 四种模式。f
file : 写入到本地文件,写入的数据人不可读
tcp: 传入的是raw原始数据,不可读,需要自己开发tcp服务器[receiver](https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct)工具来接收数据和转换成其它可读的数据格式
rpc:传入的是raw原始数据,不可读,需要自己开发rpc程序和 [receiver](https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct)工具来接收数据和转换成其它可读的数据格式
kafka: 传入数据格式可以配置为 json 格式,会将数据发送到指定topic。
目前考虑比较好的方案是使用kafka

搭建kafka服务

参考地址:https://www.yuque.com/luoqiz/docker/kafka

MongoShake 发送增量数据到kafka

运行MongoShake服务

下载地址:https://github.com/alibaba/MongoShake/releases

解压软件

  1. tar -zxvf mongo-shake-v2.6.6.tar.gz -C /opt/software

修改配置

#配置源数据库,多个源以逗号分隔
mongo_urls = mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ  
# 同步模式,all表示全量+增量同步,full表示全量同步,incr表示增量同步。
sync_mode = incr


# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。
tunnel = kafka
# tunnel target resource url
# for rpc. this is remote receiver socket address
# for tcp. this is remote receiver socket address
# for file. this is the file path, for instance "data"
# for kafka. this is the topic and brokers address which split by comma, for
# instance: topic@brokers1,brokers2, default topic is "mongoshake"
# for mock. this is uesless
# for direct. this is target mongodb address which format is the same as `mongo_urls`. If
# the target is sharding, this should be the mongos address.
# direct模式用于直接写入MongoDB,其余模式用于一些分析,或者远距离传输场景,
# 注意,如果是非direct模式,需要通过receiver进行解析,具体参考FAQ文档。
# 此处配置通道的地址,格式与mongo_urls对齐。
# tunnel.address = 192.168.88.2:10002

 tunnel.address = mongoshake@192.168.88.100:9092

# 通道数据的类型,只用于kafka和file通道类型。
# raw是默认的类型,其采用聚合的模式进行写入和
# 读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。
# json以json的格式写入kafka,便于用户直接读取。
# bson以bson二进制的格式写入kafka。
tunnel.message = json

# oplog transmit worker concurrent
# if the source is sharding, worker number must equal to shard numbers.
# 内部发送的worker数目,如果机器性能足够,可以提高worker个数。
incr_sync.worker = 8

# how many writing threads will be used in one worker.
# 对于目的端是kafka等非direct tunnel,启用多少个序列化线程,必须为"incr_sync.worker"的倍数。
# 默认为"incr_sync.worker"的值。
incr_sync.tunnel.write_thread = 8

启动服务

./collector.linux -conf=collector.conf

# 监控日志
tail  -f collector.log  100

# 监控传输数据状态
./mongoshake-stat --port=9100

配置 hosts 文件

docker 部署kafka后出现 MongoShake连接不上服务提示找不到主机 no such host
原因: kafka发送的是主机名,需要在宿主机配置 host 解析

127.0.0.1  4c4d17168808  t