教程将以 docker-compose 的方式进行演示,因此需要准备一台已经安装了 Docker以及 docker-compose 的 Linux 或者 MacOS 电脑。
创建 docker-compose-mysql.yaml 文件
使用下面的内容创建一个 docker-compose-mysql.yaml 文件
version: '2'services:zookeeper:image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}ports:- 2181:2181- 2888:2888- 3888:3888kafka:image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}ports:- 9092:9092links:- zookeeperenvironment:- ZOOKEEPER_CONNECT=zookeeper:2181mysql:image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}ports:- 3306:3306environment:- MYSQL_ROOT_PASSWORD=debezium- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpwconnect:image: quay.io/debezium/connect:${DEBEZIUM_VERSION}ports:- 8083:8083links:- kafka- mysqlenvironment:- BOOTSTRAP_SERVERS=kafka:9092- GROUP_ID=1- CONFIG_STORAGE_TOPIC=my_connect_configs- OFFSET_STORAGE_TOPIC=my_connect_offsets- STATUS_STORAGE_TOPIC=my_connect_statuses
该 Docker Compose 中包含的容器有:
- ZooKeeper
- Kafka
- MySQL
- connect
指定 Debezium 版本
使用DEBEZIUM_VERSION变量指定 Debezium 版本
export DEBEZIUM_VERSION=1.8
启动所有容器
在 docker-compose-mysql.yaml 所在目录下执行下面的命令来启动本教程需要的组件
docker-compose -f docker-compose-mysql.yaml up
启动 MySQL connector
使用下面的内容创建一个
register-mysql.json文件{"name": "inventory-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.include.list": "inventory","database.history.kafka.bootstrap.servers": "kafka:9092","database.history.kafka.topic": "schema-changes.inventory"}}
在 register-mysql.json 所在目录下执行下面的命令来启动 MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
查看 connector 列表
curl -H "Accept:application/json" localhost:8083/connectors/["inventory-connector"]
查看 connector 详情
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
消费 Debezium topic 的消息
监控 topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--topic dbserver1.inventory.customers
通过 MySQL 客户端修改数据库中的记录
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
插入数据
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
监控 topic 的终端可以看到新增的信息
更新数据
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
监控 topic 的终端可以看到更新的信息
删除数据
mysql> DELETE FROM addresses WHERE customer_id=1004;mysql> DELETE FROM customers WHERE id=1004;
监控 topic 的终端可以看到删除的信息
重新启动 Kafka Connect 服务
停止 Kafka Connect
docker-compose -f docker-compose-mysql.yaml stop connect
当服务关闭时,切换到 MySQL 命令行客户端的终端,并添加一些记录
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
这些记录被添加到数据库中。但是,因为 Kafka Connect 没有运行,所以监控 topic 的终端不会记录任何更新
启动 Kafka Connect
docker-compose -f docker-compose-mysql.yaml start connect
环境清理
本教程结束后,在
docker-compose-mysql.yaml文件所在的目录下执行如下命令停止所有容器docker-compose -f docker-compose-mysql.yaml down
