1. 下载 Debezium Kafka Connector

Debezium 作为 Kafka Connect 的插件运行。根据您的数据库类型(如 MySQL、PostgreSQL 等),选择相应的 Debezium Connector。
  1. 访问 Debezium Releases 页面
  2. 选择与您的 Kafka 版本兼容的 Debezium Connector 版本,并下载相应的 Connector 压缩文件。

使用 Debezium Connector - 图1

使用 Debezium Connector - 图2

  1. --这里我们先在plugins下创建文件夹再解压
  2. mkdir -p /home/ubuntu/kafka_2.13-3.6.0/plugins/debezium
  3. --进入创建的文件夹中
  4. cd kafka_2.13-3.6.0/plugins/debezium/
  5. --下载包
  6. wgert https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
  7. --解压
  8. tar -xzf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

2. 创建 mysql-connector.properties 文件

在 Kafka 安装目录下创建配置文件:
  1. cd /home/ubuntu/kafka_2.13-3.6.0
  2. nano config/mysql-connector.properties
将以下基本配置信息粘贴到 mysql-connector.properties 文件中,替换为您自己的数据库详情
  1. name=debezium-mysql-connector
  2. connector.class=io.debezium.connector.mysql.MySqlConnector
  3. tasks.max=1
  4. database.hostname=your_database_host
  5. database.port=3306
  6. database.user=your_database_user
  7. database.password=your_database_password
  8. database.server.id=184054
  9. #这个配置影响的是自动创建的kafka topic名称 如果是table模式,自动创建的topic是app.库名.表名
  10. database.server.name=app
  11. #检测整个库,感觉不会用到
  12. #database.include.list=your_database_name
  13. #检测表的集合table.include.list=your_database_name.table1,your_database_name.table2
  14. #table.include.list=your_database_name
  15. database.history.kafka.bootstrap.servers=localhost:9092
  16. #历史的topic 手动创建或者让kafka自动创建
  17. database.history.kafka.topic=dbhistory.fullfillment

3. 允许Kafka自动创建主题

检查 Kafka 服务器配置:在 Kafka 服务器的配置文件(通常是 server.properties)中,查找 auto.create.topics.enable 配置项。确保它被设置为 true,这样 Kafka Connect 就可以自动创建缺失的主题。

  1. auto.create.topics.enable=true

4. 创建 Kafka Connect 的 systemd 服务文件

创建服务文件: 打开一个新的服务文件用于编辑:

  1. sudo nano /etc/systemd/system/kafka-connect.service

编写服务文件内容: 将以下内容粘贴到文件中,并根据您的 Kafka 安装路径和配置进行调整:

  1. [Unit]
  2. Description=Kafka Connect
  3. Requires=kafka.service
  4. After=kafka.service
  5. [Service]
  6. Type=simple
  7. User=ubuntu
  8. ExecStart=/home/ubuntu/kafka_2.13-3.6.0/bin/connect-standalone.sh /home/ubuntu/kafka_2.13-3.6.0/config/connect-standalone.properties /home/ubuntu/kafka_2.13-3.6.0/config/mysql-connector.properties
  9. Restart=on-failure
  10. [Install]
  11. WantedBy=multi-user.target
  • 确保 ExecStart 中的路径与您的实际 Kafka Connect 和配置文件路径相匹配。
  • User=ubuntu 行指定了运行服务的用户,您可能需要将其更改为合适的用户名

启用和启动服务

重新加载 systemd 管理器配置

  1. sudo systemctl daemon-reload
  1. --开启自启
  2. sudo systemctl enable kafka-connect
  3. --关闭自启
  4. sudo systemctl disable kafka-connect
  5. --启动服务器
  6. sudo systemctl start kafka-connect
  7. --查看状态
  8. sudo systemctl status kafka-connect

5. 验证CDC是否生效

  1. --进到kafka目录
  2. cd kafka_2.13-3.6.0/
  3. --启动消费者
  4. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app.flink_demo.tb_account_name > kafka_output.log
修改表里的数据就可以看到kafka发送的消息同时写道了log文件里