1. 下载 Debezium Kafka Connector
Debezium 作为 Kafka Connect 的插件运行。根据您的数据库类型(如 MySQL、PostgreSQL 等),选择相应的 Debezium Connector。- 访问 Debezium Releases 页面。
- 选择与您的 Kafka 版本兼容的 Debezium Connector 版本,并下载相应的 Connector 压缩文件。
--这里我们先在plugins下创建文件夹再解压
mkdir -p /home/ubuntu/kafka_2.13-3.6.0/plugins/debezium
--进入创建的文件夹中
cd kafka_2.13-3.6.0/plugins/debezium/
--下载包
wgert https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
--解压
tar -xzf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
2. 创建 mysql-connector.properties 文件
在 Kafka 安装目录下创建配置文件:将以下基本配置信息粘贴到 mysql-connector.properties 文件中,替换为您自己的数据库详情
cd /home/ubuntu/kafka_2.13-3.6.0
nano config/mysql-connector.properties
name=debezium-mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=your_database_host
database.port=3306
database.user=your_database_user
database.password=your_database_password
database.server.id=184054
#这个配置影响的是自动创建的kafka topic名称 如果是table模式,自动创建的topic是app.库名.表名
database.server.name=app
#检测整个库,感觉不会用到
#database.include.list=your_database_name
#检测表的集合table.include.list=your_database_name.table1,your_database_name.table2
#table.include.list=your_database_name
database.history.kafka.bootstrap.servers=localhost:9092
#历史的topic 手动创建或者让kafka自动创建
database.history.kafka.topic=dbhistory.fullfillment
3. 允许Kafka自动创建主题
检查 Kafka 服务器配置:在 Kafka 服务器的配置文件(通常是 server.properties)中,查找 auto.create.topics.enable 配置项。确保它被设置为 true,这样 Kafka Connect 就可以自动创建缺失的主题。
auto.create.topics.enable=true
4. 创建 Kafka Connect 的 systemd 服务文件
创建服务文件: 打开一个新的服务文件用于编辑:
sudo nano /etc/systemd/system/kafka-connect.service
编写服务文件内容: 将以下内容粘贴到文件中,并根据您的 Kafka 安装路径和配置进行调整:
[Unit]
Description=Kafka Connect
Requires=kafka.service
After=kafka.service
[Service]
Type=simple
User=ubuntu
ExecStart=/home/ubuntu/kafka_2.13-3.6.0/bin/connect-standalone.sh /home/ubuntu/kafka_2.13-3.6.0/config/connect-standalone.properties /home/ubuntu/kafka_2.13-3.6.0/config/mysql-connector.properties
Restart=on-failure
[Install]
WantedBy=multi-user.target
- 确保 ExecStart 中的路径与您的实际 Kafka Connect 和配置文件路径相匹配。
- User=ubuntu 行指定了运行服务的用户,您可能需要将其更改为合适的用户名
启用和启动服务
重新加载 systemd 管理器配置:
sudo systemctl daemon-reload
--开启自启
sudo systemctl enable kafka-connect
--关闭自启
sudo systemctl disable kafka-connect
--启动服务器
sudo systemctl start kafka-connect
--查看状态
sudo systemctl status kafka-connect
5. 验证CDC是否生效
修改表里的数据就可以看到kafka发送的消息同时写道了log文件里
--进到kafka目录
cd kafka_2.13-3.6.0/
--启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app.flink_demo.tb_account_name > kafka_output.log