Debezium 介绍


Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。

Debezium 构建在 Apache Kafka 之上,并提供 Kafka 连接器来监视特定的数据库。

Kafka Connect 介绍

一般情况下,读写 Kafka 数据,都是用 Consumer 和 Producer Api 来完成,但是自己实现这些需要去考虑很多额外的东西,比如管理 Schema,容错,并行化,数据延迟,监控等等问题。

而在 0.9.0.0 版本之后,官方推出了 Kafka Connect ,大大减少了程序员的工作量,它有下面的特性:

  • 统一而通用的框架
  • 支持分布式模式和单机模式
  • REST 接口,用来查看和管理 Kafka connectors
  • 自动化的 offset 管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

Kafka Connect 有两个核心的概念:Source 和 Sink,Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为是 Connector。

如下图,左边的 Source 负责从源数据(RDBMS,File 等)读数据到 Kafka,右边的 Sinks 负责从 Kafka 消费到其他系统。
image.png

Debezium 架构和实现原理

Debezium 有三种方式可以实现变化数据的捕获

以插件的形式,部署在 Kafka Connect 上

image.png
在上图中,中间的部分是 Kafka Broker,而 Kafka Connect 是单独的服务,需要下载 debezium-connector-mysql 连接器,解压到服务器指定的地方,然后在 connect-distribute.properties 中指定连接器的根路径,即可使用。

Debezium Server

image.png
这种模式中,需要配置不同的连接器,从源头处捕获数据的变化,序列化成指定的格式,发送到指定的系统中。

内嵌在应用程序里

内嵌模式,既不依赖 Kafka,也不依赖 Debezium Server,用户可以在自己的应用程序中,依赖 Debezium 的 api 自行处理获取到的数据,并同步到其他源上。

使用 Debezium 同步 MySQL 到 Kafka

docker run 参数详解 -it 交互式运行 —rm 表示容器停止后删除本地数据 -d 表示在后台运行容器 —name 指定容器名字 -p 指定容器暴露的端口 -e 指定环境变量 —link 指定容器间的关联,使用其他容器的 IP、env等信息

启动 ZooKeeper

  1. docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8

启动 Kafka

  1. docker run -d -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8

启动 MySQL 数据库

  1. docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8

启动 MySQL 命令行客户端

  • 启动 MySQL 命令行客户端

    1. docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
  • 切换到 inventory 数据库

    1. mysql> use inventory;
  • 列出数据库中的表

    1. mysql> show tables;
    2. +---------------------+
    3. | Tables_in_inventory |
    4. +---------------------+
    5. | addresses |
    6. | customers |
    7. | geom |
    8. | orders |
    9. | products |
    10. | products_on_hand |
    11. +---------------------+
    12. 6 rows in set (0.00 sec)
  • 查看 customers 表中的数据

    1. mysql> SELECT * FROM customers;
    2. +------+------------+-----------+-----------------------+
    3. | id | first_name | last_name | email |
    4. +------+------------+-----------+-----------------------+
    5. | 1001 | Sally | Thomas | sally.thomas@acme.com |
    6. | 1002 | George | Bailey | gbailey@foobar.com |
    7. | 1003 | Edward | Walker | ed@walker.com |
    8. | 1004 | Anne | Kretchmar | annek@noanswer.org |
    9. +------+------------+-----------+-----------------------+
    10. 4 rows in set (0.00 sec)

    启动 Kafka Connect

  • 启动 Kafka Connect

    1. docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
  • 使用 Kafka Connect REST API 查看 Kafka Connect 状态

    1. curl -H "Accept:application/json" localhost:8083/
    2. {"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"EKcq3XCAT_umWw7ZS0qx_w"}
  • 查看连接器列表

    1. curl -H "Accept:application/json" localhost:8083/connectors/
    2. []

    注册 MySQL connector

  • 需要往 Kafka Connect 的 rest api 发送一个 Post 请求,请求内容如下

    1. {
    2. "name": "inventory-connector",
    3. "config": {
    4. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    5. "tasks.max": "1",
    6. "database.hostname": "mysql",
    7. "database.port": "3306",
    8. "database.user": "debezium",
    9. "database.password": "dbz",
    10. "database.server.id": "184054",
    11. "database.server.name": "dbserver1",
    12. "database.include.list": "inventory",
    13. "database.history.kafka.bootstrap.servers": "kafka:9092",
    14. "database.history.kafka.topic": "schema-changes.inventory"
    15. }
    16. }

    name:是连接器的名字 config:是连接器的配置 tasks.max:task 最大数量,应该配置成 1,因为 MySQL的 Connector 会读取 MySQL 的 binlog,使用单一的任务才能保证合理的顺序 database.hostname:这里配置的是 MySQL,其实是一个 host,如果非 docker 环境,则要配置成 ip 地址或者可以解析的域名; database.server.id:唯一的 serverId,会被作为 Kafka Topic 的前缀; database.include.list:是需要监听的数据库 database.history.kafka.topic:存放所有历史变更的 topic

  • 注册 MySQL connector 到 Kafka Connect 上

    1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
  • 查看连接器列表

    1. curl -H "Accept:application/json" localhost:8083/connectors/
    2. ["inventory-connector"]
  • 查看连接器详情 ```bash curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector

HTTP/1.1 200 OK Date: Wed, 16 Mar 2022 10:00:19 GMT Content-Type: application/json Content-Length: 534 Server: Jetty(9.4.43.v20210629)

{“name”:”inventory-connector”,”config”:{“connector.class”:”io.debezium.connector.mysql.MySqlConnector”,”database.user”:”debezium”,”database.server.id”:”184054”,”tasks.max”:”1”,”database.hostname”:”mysql”,”database.password”:”dbz”,”database.history.kafka.bootstrap.servers”:”kafka:9092”,”database.history.kafka.topic”:”dbhistory.inventory”,”name”:”inventory-connector”,”database.server.name”:”dbserver1”,”database.port”:”3306”,”database.include.list”:”inventory”},”tasks”:[{“connector”:”inventory-connector”,”task”:0}],”type”:”source”}

  1. <a name="RQ2cY"></a>
  2. ## 运行 watch-topic,查看 Debezium 发送过来的事件
  3. 总共有五个 topic
  4. - dbserver1
  5. - dbserver1.inventory.products
  6. - dbserver1.inventory.products_on_hand
  7. - dbserver1.inventory.customers
  8. - dbserver1.inventory.orders
  9. 这里我们查看 customers topic
  10. ```json
  11. docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a -k dbserver1.inventory.customers

以下是最后一个事件的键的详细信息(为便于阅读已经格式化)

  1. {
  2. "schema":{
  3. "type":"struct",
  4. "fields":[
  5. {
  6. "type":"int32",
  7. "optional":false,
  8. "field":"id"
  9. }
  10. ],
  11. "optional":false,
  12. "name":"dbserver1.inventory.customers.Key"
  13. },
  14. "payload":{
  15. "id":1004
  16. }
  17. }

以下是最后一个事件的值的详细信息(为便于阅读已经格式化)

  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "struct",
  7. "fields": [
  8. {
  9. "type": "int32",
  10. "optional": false,
  11. "field": "id"
  12. },
  13. {
  14. "type": "string",
  15. "optional": false,
  16. "field": "first_name"
  17. },
  18. {
  19. "type": "string",
  20. "optional": false,
  21. "field": "last_name"
  22. },
  23. {
  24. "type": "string",
  25. "optional": false,
  26. "field": "email"
  27. }
  28. ],
  29. "optional": true,
  30. "name": "dbserver1.inventory.customers.Value",
  31. "field": "before"
  32. },
  33. {
  34. "type": "struct",
  35. "fields": [
  36. {
  37. "type": "int32",
  38. "optional": false,
  39. "field": "id"
  40. },
  41. {
  42. "type": "string",
  43. "optional": false,
  44. "field": "first_name"
  45. },
  46. {
  47. "type": "string",
  48. "optional": false,
  49. "field": "last_name"
  50. },
  51. {
  52. "type": "string",
  53. "optional": false,
  54. "field": "email"
  55. }
  56. ],
  57. "optional": true,
  58. "name": "dbserver1.inventory.customers.Value",
  59. "field": "after"
  60. },
  61. {
  62. "type": "struct",
  63. "fields": [
  64. {
  65. "type": "string",
  66. "optional": true,
  67. "field": "version"
  68. },
  69. {
  70. "type": "string",
  71. "optional": false,
  72. "field": "name"
  73. },
  74. {
  75. "type": "int64",
  76. "optional": false,
  77. "field": "server_id"
  78. },
  79. {
  80. "type": "int64",
  81. "optional": false,
  82. "field": "ts_sec"
  83. },
  84. {
  85. "type": "string",
  86. "optional": true,
  87. "field": "gtid"
  88. },
  89. {
  90. "type": "string",
  91. "optional": false,
  92. "field": "file"
  93. },
  94. {
  95. "type": "int64",
  96. "optional": false,
  97. "field": "pos"
  98. },
  99. {
  100. "type": "int32",
  101. "optional": false,
  102. "field": "row"
  103. },
  104. {
  105. "type": "boolean",
  106. "optional": true,
  107. "field": "snapshot"
  108. },
  109. {
  110. "type": "int64",
  111. "optional": true,
  112. "field": "thread"
  113. },
  114. {
  115. "type": "string",
  116. "optional": true,
  117. "field": "db"
  118. },
  119. {
  120. "type": "string",
  121. "optional": true,
  122. "field": "table"
  123. }
  124. ],
  125. "optional": false,
  126. "name": "io.debezium.connector.mysql.Source",
  127. "field": "source"
  128. },
  129. {
  130. "type": "string",
  131. "optional": false,
  132. "field": "op"
  133. },
  134. {
  135. "type": "int64",
  136. "optional": true,
  137. "field": "ts_ms"
  138. }
  139. ],
  140. "optional": false,
  141. "name": "dbserver1.inventory.customers.Envelope",
  142. "version": 1
  143. },
  144. "payload": {
  145. "before": null,
  146. "after": {
  147. "id": 1004,
  148. "first_name": "Anne",
  149. "last_name": "Kretchmar",
  150. "email": "annek@noanswer.org"
  151. },
  152. "source": {
  153. "version": "1.8.1.Final",
  154. "name": "dbserver1",
  155. "server_id": 0,
  156. "ts_sec": 0,
  157. "gtid": null,
  158. "file": "mysql-bin.000003",
  159. "pos": 154,
  160. "row": 0,
  161. "snapshot": true,
  162. "thread": null,
  163. "db": "inventory",
  164. "table": "customers"
  165. },
  166. "op": "r",
  167. "ts_ms": 1486500577691
  168. }
  169. }

包含了 schemapayloadschema 包含一个名为 dbserver1.inventory.customers.Envelope(版本1)的 schema,该 schema 可以包含5个字段:

  • op 是必填字段,其中包含描述操作类型的字符串值。MySQL 连接器的值 c 用于创建(或插入),u用于更新,d 用于删除和 r 用于读取(在非初始快照的情况下)
  • before 是一个可选字段,如果存在,则包含事件发生之前行的状态。该结构将由dbserver1.inventory.customers.Value schema 描述,dbserver1 连接器将其用于inventory.customers 表中的所有行
  • after 是一个可选字段,如果存在,则包含事件发生后行的状态。该结构由中使用与 before 相同的dbserver1.inventory.customers.Value schema 描述
  • source 是必填字段,包含描述事件源元数据的结构,在 MySQL 中,该字段包含多个字段:连接器名称,记录事件的 binlog 文件的名称,事件出现在 binlog 文件中的位置,事件中的行(可能有多个),受影响的数据库和表的名称,进行更改的 MySQL 线程 ID,此事件是否是快照的一部分以及 MySQL 服务器(如果有) ID,以及以秒为单位的时间戳
  • ts_ms 是可选的,如果存在,则包含连接器处理事件的时间(使用运行 Kafka Connect 任务的 JVM 中的系统时钟)

更新数据并查看更新事件

  • 更新数据 ```json mysql> UPDATE customers SET first_name=’Anne Marie’ WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0

mysql> SELECT * FROM customers; +———+——————+—————-+———————————-+ | id | first_name | last_name | email | +———+——————+—————-+———————————-+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +———+——————+—————-+———————————-+ 4 rows in set (0.00 sec)

  1. - 这是更新事件的键的详细信息(为便于阅读已经格式化)
  2. ```json
  3. {
  4. "schema": {
  5. "type": "struct",
  6. "name": "dbserver1.inventory.customers.Key"
  7. "optional": false,
  8. "fields": [
  9. {
  10. "field": "id",
  11. "type": "int32",
  12. "optional": false
  13. }
  14. ]
  15. },
  16. "payload": {
  17. "id": 1004
  18. }
  19. }
  • 这是更新事件的值的详细信息(为便于阅读已经格式化)

    1. {
    2. "schema": {...},
    3. "payload": {
    4. "before": {
    5. "id": 1004,
    6. "first_name": "Anne",
    7. "last_name": "Kretchmar",
    8. "email": "annek@noanswer.org"
    9. },
    10. "after": {
    11. "id": 1004,
    12. "first_name": "Anne Marie",
    13. "last_name": "Kretchmar",
    14. "email": "annek@noanswer.org"
    15. },
    16. "source": {
    17. "name": "1.8.1.Final",
    18. "name": "dbserver1",
    19. "server_id": 223344,
    20. "ts_sec": 1486501486,
    21. "gtid": null,
    22. "file": "mysql-bin.000003",
    23. "pos": 364,
    24. "row": 0,
    25. "snapshot": null,
    26. "thread": 3,
    27. "db": "inventory",
    28. "table": "customers"
    29. },
    30. "op": "u",
    31. "ts_ms": 1486501486308
    32. }
    33. }
  • op 字段的值是现在 u,表示此行被更改,因为更新

  • before 字段现在具有行状态以及数据库提交前的值
  • after 现在字段具有该行的更新状态,在这里可以看到该 first_name 值现在是 Anne Marie
  • source字段具有许多与以前相同的值,除了 ts_secpos 字段已更改(文件在其他情况下可能已更改)
  • op 字段值现在是 u,表示该行更新
  • ts_ms 字段显示 Debezium 处理此事件的时间戳

删除数据并查看删除事件

  • 删除数据库 ```json mysql> DELETE FROM addresses WHERE customer_id=1004;

mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)

  1. - 这是删除事件的键的详细信息(为便于阅读已经格式化)
  2. ```json
  3. {
  4. "schema": {
  5. "type": "struct",
  6. "name": "dbserver1.inventory.customers.Key"
  7. "optional": false,
  8. "fields": [
  9. {
  10. "field": "id",
  11. "type": "int32",
  12. "optional": false
  13. }
  14. ]
  15. },
  16. "payload": {
  17. "id": 1004
  18. }
  19. }
  • 这是删除事件的值的详细信息(为便于阅读已经格式化) ```json { “schema”: {…}, “payload”: { “before”: {
    1. "id": 1004,
    2. "first_name": "Anne Marie",
    3. "last_name": "Kretchmar",
    4. "email": "annek@noanswer.org"
    }, “after”: null,
    “source”: {
    1. "name": "1.8.1.Final",
    2. "name": "dbserver1",
    3. "server_id": 223344,
    4. "ts_sec": 1486501558,
    5. "gtid": null,
    6. "file": "mysql-bin.000003",
    7. "pos": 725,
    8. "row": 0,
    9. "snapshot": null,
    10. "thread": 3,
    11. "db": "inventory",
    12. "table": "customers"
    }, “op”: “d”,
    “ts_ms”: 1486501558315
    } }
  1. - `before` 字段删除之前行的状态
  2. - `after` 字段为空,因为该行不再存在
  3. - `source` 字段结构具有许多与以前相同的值,除了 `ts_sec` `pos` 字段已更改(文件在其他情况下可能已更改)
  4. - `op` 字段的值现在是 `d`,表示该行已被删除
  5. - `ts_ms` 字段显示 Debezium 处理此事件的时间戳
  6. <a name="NYQDM"></a>
  7. ## 重启 Kafka Connect 服务
  8. - 停止运行 Kafka Connect 服务的容器
  9. ```json
  10. docker stop connect
  • 服务关闭后,切换到 MySQL 命令行客户端的终端,并添加几条记录

    1. mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
    2. mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");

    记录被添加到数据库中。 但是,由于 Kafka Connect 未运行,因此 watch-topic 不会记录任何更新

  • 重启运行 Kafka Connect 服务的容器

    1. docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
  • 切换到运行 watch-topic 的终端,查看在 Kafka Connect 离线时创建的两条新记录的事件

    1. {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
    2. {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}

参考 Debezium Tutorial Flink+Debezium 实现 CDC 原理及代码实战