教程将以 docker-compose 的方式进行演示,因此需要准备一台已经安装了 Docker以及 docker-compose 的 Linux 或者 MacOS 电脑。

创建 docker-compose-mysql.yaml 文件

使用下面的内容创建一个 docker-compose-mysql.yaml 文件

  1. version: '2'
  2. services:
  3. zookeeper:
  4. image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
  5. ports:
  6. - 2181:2181
  7. - 2888:2888
  8. - 3888:3888
  9. kafka:
  10. image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
  11. ports:
  12. - 9092:9092
  13. links:
  14. - zookeeper
  15. environment:
  16. - ZOOKEEPER_CONNECT=zookeeper:2181
  17. mysql:
  18. image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
  19. ports:
  20. - 3306:3306
  21. environment:
  22. - MYSQL_ROOT_PASSWORD=debezium
  23. - MYSQL_USER=mysqluser
  24. - MYSQL_PASSWORD=mysqlpw
  25. connect:
  26. image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
  27. ports:
  28. - 8083:8083
  29. links:
  30. - kafka
  31. - mysql
  32. environment:
  33. - BOOTSTRAP_SERVERS=kafka:9092
  34. - GROUP_ID=1
  35. - CONFIG_STORAGE_TOPIC=my_connect_configs
  36. - OFFSET_STORAGE_TOPIC=my_connect_offsets
  37. - STATUS_STORAGE_TOPIC=my_connect_statuses

该 Docker Compose 中包含的容器有:

  • ZooKeeper
  • Kafka
  • MySQL
  • connect

指定 Debezium 版本

使用DEBEZIUM_VERSION变量指定 Debezium 版本

  1. export DEBEZIUM_VERSION=1.8

启动所有容器

docker-compose-mysql.yaml 所在目录下执行下面的命令来启动本教程需要的组件

  1. docker-compose -f docker-compose-mysql.yaml up

启动 MySQL connector

  • 使用下面的内容创建一个 register-mysql.json 文件

    1. {
    2. "name": "inventory-connector",
    3. "config": {
    4. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    5. "tasks.max": "1",
    6. "database.hostname": "mysql",
    7. "database.port": "3306",
    8. "database.user": "debezium",
    9. "database.password": "dbz",
    10. "database.server.id": "184054",
    11. "database.server.name": "dbserver1",
    12. "database.include.list": "inventory",
    13. "database.history.kafka.bootstrap.servers": "kafka:9092",
    14. "database.history.kafka.topic": "schema-changes.inventory"
    15. }
    16. }
  • 在 register-mysql.json 所在目录下执行下面的命令来启动 MySQL connector

    1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
  • 查看 connector 列表

    1. curl -H "Accept:application/json" localhost:8083/connectors/
    2. ["inventory-connector"]
  • 查看 connector 详情

    1. curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

    消费 Debezium topic 的消息

  • 监控 topic

    1. docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    2. --bootstrap-server kafka:9092 \
    3. --from-beginning \
    4. --property print.key=true \
    5. --topic dbserver1.inventory.customers
  • 通过 MySQL 客户端修改数据库中的记录

    1. docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
  • 插入数据

    1. mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");

    监控 topic 的终端可以看到新增的信息

  • 更新数据

    1. mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

    监控 topic 的终端可以看到更新的信息

  • 删除数据

    1. mysql> DELETE FROM addresses WHERE customer_id=1004;
    2. mysql> DELETE FROM customers WHERE id=1004;

    监控 topic 的终端可以看到删除的信息

重新启动 Kafka Connect 服务

  • 停止 Kafka Connect

    1. docker-compose -f docker-compose-mysql.yaml stop connect
  • 当服务关闭时,切换到 MySQL 命令行客户端的终端,并添加一些记录

    1. mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");

    这些记录被添加到数据库中。但是,因为 Kafka Connect 没有运行,所以监控 topic 的终端不会记录任何更新

  • 启动 Kafka Connect

    1. docker-compose -f docker-compose-mysql.yaml start connect

    此时监控 topic 的终端会记录更新

    环境清理

    本教程结束后,在 docker-compose-mysql.yaml 文件所在的目录下执行如下命令停止所有容器

    1. docker-compose -f docker-compose-mysql.yaml down