原理

SpringBoot canal kafka mysql 缓存一致性问题解决方案 - 图1
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) canal 解析 binary log 对象(原始为 byte 流)

安装mysql

docker安装

切记,这里的canal使用的是1.1.4版本,mysql需要是5.7版本 直接使用docker-compose安装 docker-compose.yml

  1. version: '3'
  2. services:
  3. mysql:
  4. image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7 # 原镜像`mysql:5.7`
  5. container_name: mysql_3307 # 容器名为'mysql_3306'
  6. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程>启动时就已经停止了的容器
  7. volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
  8. - "./my.cnf:/etc/mysql/my.cnf"
  9. - "./data:/var/lib/mysql"
  10. environment: # 设置环境变量,相当于docker run命令中的-e
  11. TZ: Asia/Shanghai
  12. LANG: en_US.UTF-8
  13. MYSQL_ROOT_PASSWORD: root # 设置root用户密码
  14. MYSQL_DATABASE: testcanal # 初始化的数据库名称
  15. ports: # 映射端口
  16. - "3306:3306"

创建配置文件:my.cnf

  1. [mysqld]
  2. user=mysql # MySQL启动用户
  3. default-storage-engine=INNODB # 创建新表时将使用的默认存储引擎
  4. character-set-server=utf8mb4 # 设置mysql服务端默认字符集
  5. pid-file = /var/run/mysqld/mysqld.pid # pid文件所在目录
  6. socket = /var/run/mysqld/mysqld.sock # 用于本地连接的socket套接字
  7. datadir = /var/lib/mysql # 数据文件存放的目录
  8. #log-error = /var/log/mysql/error.log
  9. #bind-address = 127.0.0.1 # MySQL绑定IP
  10. # Disabling symbolic-links is recommended to prevent assorted security risks
  11. symbolic-links=0
  12. sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION # 定义mysql应该支持的sql语法,数据校验等!
  13. # 允许最大连接数
  14. max_connections=200
  15. # ================= ↓↓↓ mysql主从同步配置start ↓↓↓ =================
  16. # 同一局域网内注意要唯一
  17. server-id=3306
  18. # 开启二进制日志功能 & 日志位置存放位置`/var/lib/mysql`
  19. #log-bin=mysql-bin
  20. log-bin=/var/lib/mysql/mysql-bin
  21. # binlog格式
  22. # 1. STATEMENT:基于SQL语句的模式,binlog 数据量小,但是某些语句和函数在复制过程可能导致数据不一致甚至出错;
  23. # 2. MIXED:混合模式,根据语句来选用是 STATEMENT 还是 ROW 模式;
  24. # 3. ROW:基于行的模式,记录的是行的完整变化。安全,但 binlog 会比其他两种模式大很多;
  25. binlog_format=ROW
  26. # FULL:binlog记录每一行的完整变更 MINIMAL:只记录影响后的行
  27. binlog_row_image=FULL
  28. # 日志文件大小
  29. # max_binlog_size=1G
  30. max_binlog_size=100M
  31. # 定义清除过期日志的时间(这里设置为7天)
  32. expire_logs_days=7
  33. # ================= ↑↑↑ mysql主从同步配置end ↑↑↑ =================
  34. [mysql]
  35. default-character-set=utf8mb4
  36. [client]
  37. default-character-set=utf8mb4 # 设置mysql客户端默认字符集
  38. root@ubuntu:~/app/docker-compose#

启动mysql

  1. docker-compose . up -d

安装kafka

docker安装

直接使用docker-compose安装「192.168.64.2 为你自己的主机IP」docker-compose-kafka.yml

  1. version: '3'
  2. services:
  3. zookepper:
  4. image: wurstmeister/zookeeper # 原镜像`wurstmeister/zookeeper`
  5. container_name: zookeeper # 容器名为'zookeeper'
  6. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
  7. volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
  8. - "/etc/localtime:/etc/localtime"
  9. ports: # 映射端口
  10. - "2181:2181"
  11. kafka:
  12. image: wurstmeister/kafka # 原镜像`wurstmeister/kafka`
  13. container_name: kafka # 容器名为'kafka'
  14. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
  15. volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
  16. - "/etc/localtime:/etc/localtime"
  17. environment: # 设置环境变量,相当于docker run命令中的-e
  18. KAFKA_ADVERTISED_HOST_NAME: 192.168.64.2 # TODO 本机IP
  19. KAFKA_ADVERTISED_PORT: 9092 # 端口
  20. KAFKA_BROKER_ID: 0 # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
  21. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.2:9092 # TODO 将kafka的地址端口注册给zookeeper
  22. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
  23. KAFKA_ZOOKEEPER_CONNECT: 192.168.64.2:2181 # TODO zookeeper地址
  24. KAFKA_CREATE_TOPICS: "hello_world"
  25. ports: # 映射端口
  26. - "9092:9092"
  27. depends_on: # 解决容器依赖启动先后问题
  28. - zookepper
  29. kafka-manager:
  30. image: sheepkiller/kafka-manager # 原镜像`sheepkiller/kafka-manager`
  31. container_name: kafka-manager # 容器名为'kafka-manager'
  32. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
  33. environment: # 设置环境变量,相当于docker run命令中的-e
  34. ZK_HOSTS: 192.168.64.2:2181 # TODO zookeeper地址
  35. APPLICATION_SECRET: zhengqing
  36. KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启kafka-manager权限校验
  37. KAFKA_MANAGER_USERNAME: admin # 登陆账户
  38. KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码
  39. ports: # 映射端口
  40. - "9000:9000"
  41. depends_on: # 解决容器依赖启动先后问题
  42. - kafka

启动kafka

  1. docker-compose -f docker-compose-kafka.yml up -d

安装canal

下载canal

  1. wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

修改配置文件

conf/example/instance.properties

  1. # position info
  2. # 配置mysql连接端口
  3. canal.instance.master.address=127.0.0.1:3306
  4. # 配置slaveId 和mysql的 id 不一样就行
  5. canal.instance.mysql.slaveId=2
  6. # 配置数据库 账号和密码
  7. canal.instance.dbUsername=canal
  8. canal.instance.dbPassword=canal

conf/canal.properties

  1. # tcp, kafka, RocketMQ
  2. # 修改为kafka
  3. canal.serverMode = kafka
  4. # 修改成自己的kafka地址
  5. canal.mq.servers = 127.0.0.1:9092

启动canal

  1. ./bin/startup.sh

创建项目

创建一个springboot项目,过程省略

引入kafka依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

添加配置文件

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.64.2:9092
  4. producer:
  5. retries: 0
  6. acks: 1
  7. batch-size: 16384
  8. buffer-memory: 33554432
  9. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  10. # value-serializer: com.itheima.demo.config.MySerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: javagroup
  14. enable-auto-commit: true
  15. auto-commit-interval: 100
  16. auto-offset-reset: latest
  17. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  18. # value-deserializer: com.itheima.demo.config.MyDeserializer
  19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

添加消费者

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Optional;
  7. @Component
  8. public class KafkaConsumer {
  9. private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  10. //不指定group,默认取yml里配置的
  11. @KafkaListener(topics = {"example"})
  12. public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
  13. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  14. if (optional.isPresent()) {
  15. Object msg = optional.get();
  16. logger.info("message:{}", msg);
  17. }
  18. }
  19. }

测试

修改数据库记录,查看打印信息SpringBoot canal kafka mysql 缓存一致性问题解决方案 - 图2SpringBoot canal kafka mysql 缓存一致性问题解决方案 - 图3搞定!!!