Debezium 介绍
Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。
Debezium 构建在 Apache Kafka 之上,并提供 Kafka 连接器来监视特定的数据库。
Kafka Connect 介绍
一般情况下,读写 Kafka 数据,都是用 Consumer 和 Producer Api 来完成,但是自己实现这些需要去考虑很多额外的东西,比如管理 Schema,容错,并行化,数据延迟,监控等等问题。
而在 0.9.0.0 版本之后,官方推出了 Kafka Connect ,大大减少了程序员的工作量,它有下面的特性:
- 统一而通用的框架
- 支持分布式模式和单机模式
- REST 接口,用来查看和管理 Kafka connectors
- 自动化的 offset 管理,开发人员不必担心错误处理的影响
- 分布式、可扩展
- 流/批处理集成
Kafka Connect 有两个核心的概念:Source 和 Sink,Source 负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为是 Connector。
如下图,左边的 Source 负责从源数据(RDBMS,File 等)读数据到 Kafka,右边的 Sinks 负责从 Kafka 消费到其他系统。
Debezium 架构和实现原理
以插件的形式,部署在 Kafka Connect 上
在上图中,中间的部分是 Kafka Broker,而 Kafka Connect 是单独的服务,需要下载 debezium-connector-mysql 连接器,解压到服务器指定的地方,然后在 connect-distribute.properties 中指定连接器的根路径,即可使用。
Debezium Server
这种模式中,需要配置不同的连接器,从源头处捕获数据的变化,序列化成指定的格式,发送到指定的系统中。
内嵌在应用程序里
内嵌模式,既不依赖 Kafka,也不依赖 Debezium Server,用户可以在自己的应用程序中,依赖 Debezium 的 api 自行处理获取到的数据,并同步到其他源上。
使用 Debezium 同步 MySQL 到 Kafka
docker run 参数详解 -it 交互式运行 —rm 表示容器停止后删除本地数据 -d 表示在后台运行容器 —name 指定容器名字 -p 指定容器暴露的端口 -e 指定环境变量 —link 指定容器间的关联,使用其他容器的 IP、env等信息
启动 ZooKeeper
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
启动 Kafka
docker run -d -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
启动 MySQL 数据库
docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8
启动 MySQL 命令行客户端
启动 MySQL 命令行客户端
docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
切换到 inventory 数据库
mysql> use inventory;
列出数据库中的表
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.00 sec)
查看 customers 表中的数据
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
启动 Kafka Connect
启动 Kafka Connect
docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
使用 Kafka Connect REST API 查看 Kafka Connect 状态
curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"EKcq3XCAT_umWw7ZS0qx_w"}
查看连接器列表
curl -H "Accept:application/json" localhost:8083/connectors/
[]
注册 MySQL connector
需要往 Kafka Connect 的 rest api 发送一个 Post 请求,请求内容如下
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
name:是连接器的名字 config:是连接器的配置 tasks.max:task 最大数量,应该配置成 1,因为 MySQL的 Connector 会读取 MySQL 的 binlog,使用单一的任务才能保证合理的顺序 database.hostname:这里配置的是 MySQL,其实是一个 host,如果非 docker 环境,则要配置成 ip 地址或者可以解析的域名; database.server.id:唯一的 serverId,会被作为 Kafka Topic 的前缀; database.include.list:是需要监听的数据库 database.history.kafka.topic:存放所有历史变更的 topic
注册 MySQL connector 到 Kafka Connect 上
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
查看连接器列表
curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
查看连接器详情 ```bash curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector
HTTP/1.1 200 OK Date: Wed, 16 Mar 2022 10:00:19 GMT Content-Type: application/json Content-Length: 534 Server: Jetty(9.4.43.v20210629)
{“name”:”inventory-connector”,”config”:{“connector.class”:”io.debezium.connector.mysql.MySqlConnector”,”database.user”:”debezium”,”database.server.id”:”184054”,”tasks.max”:”1”,”database.hostname”:”mysql”,”database.password”:”dbz”,”database.history.kafka.bootstrap.servers”:”kafka:9092”,”database.history.kafka.topic”:”dbhistory.inventory”,”name”:”inventory-connector”,”database.server.name”:”dbserver1”,”database.port”:”3306”,”database.include.list”:”inventory”},”tasks”:[{“connector”:”inventory-connector”,”task”:0}],”type”:”source”}
<a name="RQ2cY"></a>
## 运行 watch-topic,查看 Debezium 发送过来的事件
总共有五个 topic
- dbserver1
- dbserver1.inventory.products
- dbserver1.inventory.products_on_hand
- dbserver1.inventory.customers
- dbserver1.inventory.orders
这里我们查看 customers topic
```json
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a -k dbserver1.inventory.customers
以下是最后一个事件的键的详细信息(为便于阅读已经格式化)
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"dbserver1.inventory.customers.Key"
},
"payload":{
"id":1004
}
}
以下是最后一个事件的值的详细信息(为便于阅读已经格式化)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.8.1.Final",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "inventory",
"table": "customers"
},
"op": "r",
"ts_ms": 1486500577691
}
}
包含了 schema
和 payload
。schema
包含一个名为 dbserver1.inventory.customers.Envelope
(版本1)的 schema
,该 schema
可以包含5个字段:
op
是必填字段,其中包含描述操作类型的字符串值。MySQL 连接器的值c
用于创建(或插入),u
用于更新,d
用于删除和r
用于读取(在非初始快照的情况下)before
是一个可选字段,如果存在,则包含事件发生之前行的状态。该结构将由dbserver1.inventory.customers.Value
schema
描述,dbserver1
连接器将其用于inventory.customers
表中的所有行after
是一个可选字段,如果存在,则包含事件发生后行的状态。该结构由中使用与before
相同的dbserver1.inventory.customers.Value
schema
描述source
是必填字段,包含描述事件源元数据的结构,在 MySQL 中,该字段包含多个字段:连接器名称,记录事件的 binlog 文件的名称,事件出现在 binlog 文件中的位置,事件中的行(可能有多个),受影响的数据库和表的名称,进行更改的 MySQL 线程 ID,此事件是否是快照的一部分以及 MySQL 服务器(如果有) ID,以及以秒为单位的时间戳ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行 Kafka Connect 任务的 JVM 中的系统时钟)
更新数据并查看更新事件
- 更新数据 ```json mysql> UPDATE customers SET first_name=’Anne Marie’ WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> SELECT * FROM customers; +———+——————+—————-+———————————-+ | id | first_name | last_name | email | +———+——————+—————-+———————————-+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +———+——————+—————-+———————————-+ 4 rows in set (0.00 sec)
- 这是更新事件的键的详细信息(为便于阅读已经格式化)
```json
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
这是更新事件的值的详细信息(为便于阅读已经格式化)
{
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"name": "1.8.1.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308
}
}
op
字段的值是现在u
,表示此行被更改,因为更新before
字段现在具有行状态以及数据库提交前的值after
现在字段具有该行的更新状态,在这里可以看到该first_name
值现在是Anne Marie
source
字段具有许多与以前相同的值,除了ts_sec
和pos
字段已更改(文件在其他情况下可能已更改)op
字段值现在是u
,表示该行更新ts_ms
字段显示 Debezium 处理此事件的时间戳
删除数据并查看删除事件
- 删除数据库 ```json mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
- 这是删除事件的键的详细信息(为便于阅读已经格式化)
```json
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
- 这是删除事件的值的详细信息(为便于阅读已经格式化)
```json
{
“schema”: {…},
“payload”: {
“before”: {
}, “after”: null,"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
“source”: {
}, “op”: “d”,"name": "1.8.1.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501558,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
“ts_ms”: 1486501558315
} }
- `before` 字段删除之前行的状态
- `after` 字段为空,因为该行不再存在
- `source` 字段结构具有许多与以前相同的值,除了 `ts_sec` 和 `pos` 字段已更改(文件在其他情况下可能已更改)
- `op` 字段的值现在是 `d`,表示该行已被删除
- `ts_ms` 字段显示 Debezium 处理此事件的时间戳
<a name="NYQDM"></a>
## 重启 Kafka Connect 服务
- 停止运行 Kafka Connect 服务的容器
```json
docker stop connect
服务关闭后,切换到 MySQL 命令行客户端的终端,并添加几条记录
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
记录被添加到数据库中。 但是,由于 Kafka Connect 未运行,因此 watch-topic 不会记录任何更新
重启运行 Kafka Connect 服务的容器
docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8
切换到运行 watch-topic 的终端,查看在 Kafka Connect 离线时创建的两条新记录的事件
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"1.8.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}