教程将以 docker-compose 的方式进行演示,因此需要准备一台已经安装了 Docker
以及 docker-compose 的 Linux 或者 MacOS 电脑。
创建 docker-compose-mysql.yaml
文件
使用下面的内容创建一个 docker-compose-mysql.yaml
文件
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
该 Docker Compose 中包含的容器有:
- ZooKeeper
- Kafka
- MySQL
- connect
指定 Debezium 版本
使用DEBEZIUM_VERSION
变量指定 Debezium 版本
export DEBEZIUM_VERSION=1.8
启动所有容器
在 docker-compose-mysql.yaml
所在目录下执行下面的命令来启动本教程需要的组件
docker-compose -f docker-compose-mysql.yaml up
启动 MySQL connector
使用下面的内容创建一个
register-mysql.json
文件{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
在 register-mysql.json 所在目录下执行下面的命令来启动 MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
查看 connector 列表
curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
查看 connector 详情
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
消费 Debezium topic 的消息
监控 topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
通过 MySQL 客户端修改数据库中的记录
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
插入数据
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
监控 topic 的终端可以看到新增的信息
更新数据
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
监控 topic 的终端可以看到更新的信息
删除数据
mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM customers WHERE id=1004;
监控 topic 的终端可以看到删除的信息
重新启动 Kafka Connect 服务
停止 Kafka Connect
docker-compose -f docker-compose-mysql.yaml stop connect
当服务关闭时,切换到 MySQL 命令行客户端的终端,并添加一些记录
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
这些记录被添加到数据库中。但是,因为 Kafka Connect 没有运行,所以监控 topic 的终端不会记录任何更新
启动 Kafka Connect
docker-compose -f docker-compose-mysql.yaml start connect
环境清理
本教程结束后,在
docker-compose-mysql.yaml
文件所在的目录下执行如下命令停止所有容器docker-compose -f docker-compose-mysql.yaml down