Debezium 介绍
Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。
Debezium 构建在 Apache Kafka 之上,并提供 Kafka 连接器来监视特定的数据库。
Kafka Connect 介绍
一般情况下,读写 Kafka 数据,都是用 Consumer 和 Producer Api 来完成,但是自己实现这些需要去考虑很多额外的东西,比如管理 Schema,容错,并行化,数据延迟,监控等等问题。
而在 0.9.0.0 版本之后,官方推出了 Kafka Connect ,大大减少了程序员的工作量,它有下面的特性:
- 统一而通用的框架
- 支持分布式模式和单机模式
- REST 接口,用来查看和管理 Kafka connectors
- 自动化的 offset 管理,开发人员不必担心错误处理的影响
- 分布式、可扩展
- 流/批处理集成
Kafka Connect 有两个核心的概念:Source 和 Sink,Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为是 Connector。
如下图,左边的 Source 负责从源数据(RDBMS,File 等)读数据到 Kafka,右边的 Sinks 负责从 Kafka 消费到其他系统。
Debezium 架构和实现原理
以插件的形式,部署在 Kafka Connect 上

在上图中,中间的部分是 Kafka Broker,而 Kafka Connect 是单独的服务,需要下载 debezium-connector-mysql 连接器,解压到服务器指定的地方,然后在 connect-distribute.properties 中指定连接器的根路径,即可使用。
Debezium Server

这种模式中,需要配置不同的连接器,从源头处捕获数据的变化,序列化成指定的格式,发送到指定的系统中。
内嵌在应用程序里
内嵌模式,既不依赖 Kafka,也不依赖 Debezium Server,用户可以在自己的应用程序中,依赖 Debezium 的 api 自行处理获取到的数据,并同步到其他源上。
使用 Debezium 同步 MySQL 到 Kafka
docker run 参数详解 -it 交互式运行 —rm 表示容器停止后删除本地数据 -d 表示在后台运行容器 —name 指定容器名字 -p 指定容器暴露的端口 -e 指定环境变量 —link 指定容器间的关联,使用其他容器的 IP、env等信息
启动 ZooKeeper
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
启动 Kafka
docker run -d -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
启动 MySQL 数据库
docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8
启动 MySQL 命令行客户端
启动 MySQL 命令行客户端
docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
切换到 inventory 数据库
mysql> use inventory;
列出数据库中的表
mysql> show tables;+---------------------+| Tables_in_inventory |+---------------------+| addresses || customers || geom || orders || products || products_on_hand |+---------------------+6 rows in set (0.00 sec)
查看 customers 表中的数据
mysql> SELECT * FROM customers;+------+------------+-----------+-----------------------+| id | first_name | last_name | email |+------+------------+-----------+-----------------------+| 1001 | Sally | Thomas | sally.thomas@acme.com || 1002 | George | Bailey | gbailey@foobar.com || 1003 | Edward | Walker | ed@walker.com || 1004 | Anne | Kretchmar | annek@noanswer.org |+------+------------+-----------+-----------------------+4 rows in set (0.00 sec)
启动 Kafka Connect
启动 Kafka Connect
docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
使用 Kafka Connect REST API 查看 Kafka Connect 状态
curl -H "Accept:application/json" localhost:8083/{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"EKcq3XCAT_umWw7ZS0qx_w"}
查看连接器列表
curl -H "Accept:application/json" localhost:8083/connectors/[]
注册 MySQL connector
需要往 Kafka Connect 的 rest api 发送一个 Post 请求,请求内容如下
{"name": "inventory-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.include.list": "inventory","database.history.kafka.bootstrap.servers": "kafka:9092","database.history.kafka.topic": "schema-changes.inventory"}}
name:是连接器的名字 config:是连接器的配置 tasks.max:task 最大数量,应该配置成 1,因为 MySQL的 Connector 会读取 MySQL 的 binlog,使用单一的任务才能保证合理的顺序 database.hostname:这里配置的是 MySQL,其实是一个 host,如果非 docker 环境,则要配置成 ip 地址或者可以解析的域名; database.server.id:唯一的 serverId,会被作为 Kafka Topic 的前缀; database.include.list:是需要监听的数据库 database.history.kafka.topic:存放所有历史变更的 topic
注册 MySQL connector 到 Kafka Connect 上
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
查看连接器列表
curl -H "Accept:application/json" localhost:8083/connectors/["inventory-connector"]
查看连接器详情 ```bash curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector
HTTP/1.1 200 OK Date: Wed, 16 Mar 2022 10:00:19 GMT Content-Type: application/json Content-Length: 534 Server: Jetty(9.4.43.v20210629)
{“name”:”inventory-connector”,”config”:{“connector.class”:”io.debezium.connector.mysql.MySqlConnector”,”database.user”:”debezium”,”database.server.id”:”184054”,”tasks.max”:”1”,”database.hostname”:”mysql”,”database.password”:”dbz”,”database.history.kafka.bootstrap.servers”:”kafka:9092”,”database.history.kafka.topic”:”dbhistory.inventory”,”name”:”inventory-connector”,”database.server.name”:”dbserver1”,”database.port”:”3306”,”database.include.list”:”inventory”},”tasks”:[{“connector”:”inventory-connector”,”task”:0}],”type”:”source”}
<a name="RQ2cY"></a>## 运行 watch-topic,查看 Debezium 发送过来的事件总共有五个 topic- dbserver1- dbserver1.inventory.products- dbserver1.inventory.products_on_hand- dbserver1.inventory.customers- dbserver1.inventory.orders这里我们查看 customers topic```jsondocker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a -k dbserver1.inventory.customers
以下是最后一个事件的键的详细信息(为便于阅读已经格式化)
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}
以下是最后一个事件的值的详细信息(为便于阅读已经格式化)
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "dbserver1.inventory.customers.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "dbserver1.inventory.customers.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": true,"field": "version"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "server_id"},{"type": "int64","optional": false,"field": "ts_sec"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "boolean","optional": true,"field": "snapshot"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "db"},{"type": "string","optional": true,"field": "table"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "dbserver1.inventory.customers.Envelope","version": 1},"payload": {"before": null,"after": {"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": {"version": "1.8.1.Final","name": "dbserver1","server_id": 0,"ts_sec": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"snapshot": true,"thread": null,"db": "inventory","table": "customers"},"op": "r","ts_ms": 1486500577691}}
包含了 schema和 payload。schema 包含一个名为 dbserver1.inventory.customers.Envelope(版本1)的 schema,该 schema 可以包含5个字段:
op是必填字段,其中包含描述操作类型的字符串值。MySQL 连接器的值c用于创建(或插入),u用于更新,d用于删除和r用于读取(在非初始快照的情况下)before是一个可选字段,如果存在,则包含事件发生之前行的状态。该结构将由dbserver1.inventory.customers.Valueschema描述,dbserver1连接器将其用于inventory.customers表中的所有行after是一个可选字段,如果存在,则包含事件发生后行的状态。该结构由中使用与before相同的dbserver1.inventory.customers.Valueschema描述source是必填字段,包含描述事件源元数据的结构,在 MySQL 中,该字段包含多个字段:连接器名称,记录事件的 binlog 文件的名称,事件出现在 binlog 文件中的位置,事件中的行(可能有多个),受影响的数据库和表的名称,进行更改的 MySQL 线程 ID,此事件是否是快照的一部分以及 MySQL 服务器(如果有) ID,以及以秒为单位的时间戳ts_ms是可选的,如果存在,则包含连接器处理事件的时间(使用运行 Kafka Connect 任务的 JVM 中的系统时钟)
更新数据并查看更新事件
- 更新数据 ```json mysql> UPDATE customers SET first_name=’Anne Marie’ WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> SELECT * FROM customers; +———+——————+—————-+———————————-+ | id | first_name | last_name | email | +———+——————+—————-+———————————-+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +———+——————+—————-+———————————-+ 4 rows in set (0.00 sec)
- 这是更新事件的键的详细信息(为便于阅读已经格式化)```json{"schema": {"type": "struct","name": "dbserver1.inventory.customers.Key""optional": false,"fields": [{"field": "id","type": "int32","optional": false}]},"payload": {"id": 1004}}
这是更新事件的值的详细信息(为便于阅读已经格式化)
{"schema": {...},"payload": {"before": {"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"after": {"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": {"name": "1.8.1.Final","name": "dbserver1","server_id": 223344,"ts_sec": 1486501486,"gtid": null,"file": "mysql-bin.000003","pos": 364,"row": 0,"snapshot": null,"thread": 3,"db": "inventory","table": "customers"},"op": "u","ts_ms": 1486501486308}}
op字段的值是现在u,表示此行被更改,因为更新before字段现在具有行状态以及数据库提交前的值after现在字段具有该行的更新状态,在这里可以看到该first_name值现在是Anne Mariesource字段具有许多与以前相同的值,除了ts_sec和pos字段已更改(文件在其他情况下可能已更改)op字段值现在是u,表示该行更新ts_ms字段显示 Debezium 处理此事件的时间戳
删除数据并查看删除事件
- 删除数据库 ```json mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
- 这是删除事件的键的详细信息(为便于阅读已经格式化)```json{"schema": {"type": "struct","name": "dbserver1.inventory.customers.Key""optional": false,"fields": [{"field": "id","type": "int32","optional": false}]},"payload": {"id": 1004}}
- 这是删除事件的值的详细信息(为便于阅读已经格式化)
```json
{
“schema”: {…},
“payload”: {
“before”: {
}, “after”: null,"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"
“source”: {
}, “op”: “d”,"name": "1.8.1.Final","name": "dbserver1","server_id": 223344,"ts_sec": 1486501558,"gtid": null,"file": "mysql-bin.000003","pos": 725,"row": 0,"snapshot": null,"thread": 3,"db": "inventory","table": "customers"
“ts_ms”: 1486501558315
} }
- `before` 字段删除之前行的状态- `after` 字段为空,因为该行不再存在- `source` 字段结构具有许多与以前相同的值,除了 `ts_sec` 和 `pos` 字段已更改(文件在其他情况下可能已更改)- `op` 字段的值现在是 `d`,表示该行已被删除- `ts_ms` 字段显示 Debezium 处理此事件的时间戳<a name="NYQDM"></a>## 重启 Kafka Connect 服务- 停止运行 Kafka Connect 服务的容器```jsondocker stop connect
服务关闭后,切换到 MySQL 命令行客户端的终端,并添加几条记录
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
记录被添加到数据库中。 但是,由于 Kafka Connect 未运行,因此 watch-topic 不会记录任何更新
重启运行 Kafka Connect 服务的容器
docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
切换到运行 watch-topic 的终端,查看在 Kafka Connect 离线时创建的两条新记录的事件
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}
