一、Canal技术介绍:
1.1 calnal发展介绍:
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
1.2 calnal工作原理:
1.2.1 MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
1.2.2 canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
1.2.3 canal架构说明:
说明:server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
二、Linux环境安装Canal:
2.1 环境修改
2.1.1 进入容器,修改my.cnf内容如下(mysql支持binlog):
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
提示: 如果mysql安装在docker下,可以执行如下命令,修改上面的配置文件: ① docker exec -it mysql /bin/bash ② vi /etc/mysql/my.cnf ③ 创建mysql容器时,可以使用如下命令: 【备注1:提前创建好相关的目录】
mkdir -p /mydata/mysql3/data /mydata/mysql3/conf /mydata/mysql3/logs
docker run -p 3308:3306 --restart=always -v /mydata/mysql3/conf:/etc/mysql/conf.d -v /mydata/mysql3/logs:/var/log/mysql -v /mydata/mysql3/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --name mysql -d mysql:5.7
【备注2:在/mydata/mysql3/conf中创建my.cnf文件内容如下:】
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
log-bin=mysql-bin
binlog-format=ROW
server_id=1
不要忘记重启mysql:
docker restart mysql3
【备注3:进行mysql内部容器,查看是否binlog日志添加成功】
docker exec -it mysql3 /bin/bash
mysql -uroot -p123
show variables like '%bin%'
【备注4:进行mysql内部容器,查看是否binlog日志添加成功】
因为starter-canal中使用的client-canal的版本为1.0.25,所以,最好下载canal的版本为1.0.25以下的版酊,
否则,会失败
2.1.2 创建用户并授权:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
说明: 如果不创建用户,也可以使用root用户,密码就是你的mysql密码
2.1.3 下载canal安装包:
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-{版本号}.tar.gz
说明: 下载地址
2.1.4 解压缩并修改配置文件:
修改canal/conf/canal.properties文件:
修改canal/conf/example/instance.properties文件:
mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
- 所有表:. or .\..
2. canal schema下所有表: canal\..
3. canal下的以canal打头的表:canal\.canal.*
4. canal schema下的一张表:canal\.test1- 多个规则组合使用:canal\..,mysql.test1,mysql.test2 (逗号分隔) [*详细配置参见](https://github.com/alibaba/canal/wiki/AdminGuide)
2.2 启动运行:
2.2.1 执行canal/bin/startup.sh命令:
cd /usr/local/canal
bin/startup.sh
2.2.2 查看日志
① canal/logs/canal/canal.log
② canal/logs/example/example.log
备注:下载的canal版本不能高于1.0.19版本
三、【实战】在项目中将广告数据库同步到redis中
3.1 从github下载第三方的canal的spring-boot-starter:
3.2 新建模块canal-demo
3.2.1 在模块中添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--1.引入canal的依赖-->
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
3.2.2 添加application.yml配置文件,
server:
port: 7701
canal:
client:
instances:
example:
host: 192.168.56.10
port: 11111
spring:
application:
name: canal
3.2.3 添加监听器:
/**
* ------------------------------
* 功能:
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/9-20:46
* ------------------------------
*/
@CanalEventListener
public class CanalEventDataListener {
/**
* 功能: 监听插入事件
* 参数:
* 返回值: void
* 时间: 2021/8/10 13:07
*/
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("onEventInsert");
System.out.println("------------------------------------------------------------------------------------");
rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));
}
/**
* 功能: 监听修改事件
* 参数:
* 返回值: void
* 时间: 2021/8/10 13:07
*/
@UpdateListenPoint
public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("onEventUpdate");
System.out.println("------------------------------------------------------------------------------------");
rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));
}
/**
* 功能: 监听删除事件
* 参数:
* 返回值: void
* 时间: 2021/8/10 13:07
*/
@DeleteListenPoint
public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("onEventDelete");
System.out.println("------------------------------------------------------------------------------------");
rowData.getBeforeColumnsList().stream().forEach(c-> System.out.println( c.getName() + "->" + c.getValue()));
}
/**
* 功能: 自定义监听
* 参数:
* 返回值: void
* 时间: 2021/8/10 13:06
*/
@ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_item"},
eventType = {
CanalEntry.EventType.UPDATE,
CanalEntry.EventType.INSERT,
CanalEntry.EventType.DELETE}
)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("onEventCustomUpdate");
System.out.println("------------------------------------------------------------------------------------");
rowData.getAfterColumnsList().forEach((c) -> System.out.println( c.getName() + "->" + c.getValue()));
}
}
3.2.4 添加启动类:
/**
* ------------------------------
* 功能:
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/9-20:55
* ------------------------------
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableCanalClient //启用canal
public class CancalDataApplication {
public static void main(String[] args) {
SpringApplication.run(CancalDataApplication.class);
}
}
3.3 实现当后台修改、添加、删除广告时,会自动同步到redis中
3.3.1 将原来content-service中的关于增、删、改广告时,删除redis的代码注掉:
/**
* 功能: 添加广告
* 参数:
* 返回值: void
* 时间: 2021/8/2 15:00
*/
@Override
public void add(ContentEntity content) {
//1. 删除redis中的缓存
//redisTemplate.delete("contents");
//2. 添加到数据库中
this.save(content);
}
@Override
public void update(ContentEntity content) {
//1. 删除redis中的缓存
//redisTemplate.delete("contents");
//2. 调用 修改方法
this.updateById(content);
}
@Override
public void delete(List<Long> ids) {
//1. 删除redis中的缓存
//redisTemplate.delete("contents");
//2. 根据id删除广告
this.removeByIds(ids);
}
3.3.2 在canal-demo模块中,添加自定义的事件处理:
第一步:修改依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--1.引入canal的依赖-->
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--2.引入redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
第二步:修改application.yml文件:
spring:
redis:
host: 192.168.56.10
port: 6379
第三步:添加redis的序列化配置:
@Configuration
public class RedisTemplateSerializerConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory){
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
//1. 设置key的序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
//2. 设置value的序列化器
// redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
}
第四步:定义事件:
/**
* 功能: 自定义监听(测试)
* 参数:
* 返回值: void
* 时间: 2021/8/10 13:06
*/
@ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_content"},
eventType = {
CanalEntry.EventType.UPDATE,
CanalEntry.EventType.INSERT,
CanalEntry.EventType.DELETE}
)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
//1. 根据事件得到不同的数据
//1.1 得到添加或修改事件时的数据列表
//List<CanalEntry.Column> columnsListAddOrUpdate = rowData.getAfterColumnsList();
//1.2 得到删除时的数据列表
//List<CanalEntry.Column> columnsListDelete = rowData.getBeforeColumnsList();
System.out.println("正在同步广告列表...");
//直接删除redis中的数据
redisTemplate.delete("contents");
}
第五步:运行效果(略)