一、常用的数据同步方案
1、数据库迁移场景
以Mysql数据库迁移为例,数据库常用迁移方案有停机迁移和平滑迁移。
平滑迁移又分为双写和CDC(数据变更抓取)。
双写:即所有写入操作同时写入旧表和新表,这种方式可以完全控制应用代码如何写数据库,听上去简单明了。但它会引入复杂的分布式一致性问题:要保证新旧库中两张表数据一致,双写操作就必须在一个分布式事务中完成,而分布式事务的代价太高了。CDC:通过数据源的事务日志抓取数据源变更解决数据同步问题
2、数据同步场景
微服务开发环境下,为了提高搜索效率,以及搜索的精准度,会大量使用Redis、MongoBD等NoSQL数据库,也会使用大量的Solr、Elasticsearch等全文检索服务。那么,这个时候,就会有一个问题需要我们来思考和解决:那就是数据同步的问题!如何将实时变化的数据库中的数据同步到Redis/MongoBD或者Solr/Elasticsearch中呢?
(1)、应用代码中同步
在增加、修改、删除之后,执行操作ES的逻辑代码。例如下面的代码片段。
public ResponseResult updateStatus(Long[] ids, String status){
try{
taskService.updateStatus(ids, status);
if(“status_success”.equals(status)){
List
//数据写入es
esClient.importList(itemList);
//数据写入redis
// redisTemplate.save(itemList);
return new ResponseResult(true, “修改状态成功”)
}
}catch(Exception e){
return new ResponseResult(false, “修改状态失败”);
}
}
优点:
实施起来比较简单,简单服务里面常用的方式。
缺点:
代码耦合度高。
和业务逻辑同步执行,效率变低。
Q:这里有一个问题想和大家讨论一下,对于一个方法里既有数据库的操作又有同步调用http/rpc接口的方法,如何保证一致性?
比如下面这个场景:
一个售后工单的处理,首先需要经过【客诉系统】,然后需要转到【工单系统】生成一个工单,方法逻辑大概如下:
@Transactional
public void handleKeSU(Integer orderId) {
//调用http接口插入工单
httpClient.saveGongDan(orderId);
//修改客诉单状态为【已转工单】
updateKeSuStatus(orderId);
}
因为流程问题,客诉单状态修改和工单系统生成工单需要一致,即工单生成成功,则客诉单状态修改成功,工单生成失败,则客诉单修改失败。
解决方案:将http调用放到本地数据库修改后面,依据事物回滚。
这样还有什么问题?当http调用响应时间超时,其实调用方工单已经生成成功,但是本地调用响应超时抛出异常导致回滚。
(2)、定时任务同步
在数据库中执行完增加、修改、删除操作后,通过定时任务定时的将数据库的数据同步到ES索引库中。
定时任务技术有:SpringTask,Quartz,XXLJOB。
这里执行定时任务时,需要注意的一个技巧是:第一次执行定时任务时,从MySQL数据库中以时间字段进行倒序排列查询相应的数据,并记录当前查询数据的时间字段的最大值,以后每次执行定时任务查询数据的时候,只要按时间字段倒序查询数据表中的时间字段大于上次记录的时间值的数据,并且记录本次任务查询出的时间字段的最大值即可,从而不需要再次查询数据表中的所有数据。
注意:这里所说的时间字段指的是标识数据更新的时间字段,也就是说,使用定时任务同步数据时,为了避免每次执行任务都会进行全表扫描,最好是在数据表中增加一个更新记录的时间字段。
优点:
同步ES索引库的操作与业务代码完全解耦。
缺点:
数据的实时性并不高。
(3)、通过MQ实现同步
在数据库中执行完增加、修改、删除操作后,向MQ中发送一条消息,此时,同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步Solr索引库的逻辑。
我们可以使用下图来简单的标识通过MQ实现数据同步的过程。
我们可以使用如下代码实现这个过程。
public ResponseResult updateStatus(Long[] ids, String status){
try{
goodsService.updateStatus(ids, status);
if(“status_success”.equals(status)){
List
final String jsonString = JSON.toJSONString(itemList);
//发送消息
jmsTemplate.send(queueSolr, new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException{
return session.createTextMessage(jsonString);
}
});
}
return new ResponseResult(true, “修改状态成功”);
}catch(Exception e){
return new ResponseResult(false, “修改状态失败”);
}
}
优点:
业务代码解耦,并且能够做到准实时。目前tk的ES同步用的就是这中方式吧
缺点:
需要在业务代码中加入发送消息到MQ的代码,数据调用接口耦合。
(4)、通过CDC实现实时同步
通过CDC来解析数据库的日志信息,来检测数据库中表结构和数据的变化,从而更新ES索引库。
使用CDC可以做到业务代码完全解耦,API完全解耦,可以做到准实时。
二、CDC(change data capture,数据变更抓取)
通过数据源的事务日志抓取数据源变更,这能解决一致性问题(只要下游能保证变更应用到新库上)。它的问题在于各种数据源的变更抓取没有统一的协议,如MySQL 用 Binlog,PostgreSQL 用 Logical decoding 机制,MongoDB 里则是 oplog。
- Canal,阿里开源的基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。
- Databus,Linkedin 的分布式数据变更抓取系统。它的 MySQL 变更抓取模块很不成熟,官方支持的是 Oracle,MySQL 只是使用另一个开源组件 OpenReplicator 做了一个 demo。另一个不利因素 databus 使用了自己实现的一个 Relay 作为变更分发平台,相比于使用开源消息队列的方案,这对维护和外部集成都不友好。
- Mysql-Streamer,Yelp 的基于python的数据管道。
- Debezium,Redhat 开源的数据变更抓取组件。支持 MySQL、MongoDB、PostgreSQL 三种数据源的变更抓取。Snapshot Mode 可以将表中的现有数据全部导入 Kafka,并且全量数据与增量数据形式一致,可以统一处理,很适合数据
总结
本文主要讨论数据同步方案,并对canal做了简单介绍。同时也对binlog的解析和mysql协议简单介绍希望能了解这种CDC的基本原理。