- Avro 介绍
- Avro 序列化
- Confluent Schema Registry 的架构
- 安装社区版 Confluent Platform(包括 Schema Registry)
- 配置 Confluent Platform(单机不需要配置)
- server.
= : : - The address the socket server listens on.
- FORMAT:
- listeners = listener_name://host_name:port
- EXAMPLE:
- listeners = PLAINTEXT://your.host.name:9092
- Use this setting to specify the bootstrap servers for your Kafka cluster and it
- will be used both for selecting the leader schema registry instance and for storing the data for
- registered schemas.
- 启动 Confluent Platform
- 使用 Debezium 同步 MySQL 数据到 Kafka
- 遇到的问题
Avro 介绍
Apache Avro™ 是一个数据序列化系统。
Avro 提供:
- 丰富的数据结构。
- 一种紧凑、快速的二进制数据格式。
- 一个容器文件,用于存储持久数据。
- 远程过程调用 (RPC)。
- 与动态语言的简单集成。不需要生成代码来读取或写入数据文件,也不需要使用或实现 RPC 协议。代码生成是一种可选的优化,静态类型语言才值得实现。
schema
Avro 依赖于schema
。读取 Avro 数据时,写入时使用的schema
始终存在。这允许不需要为每个值都写入schema
的情况下写入每个数据,从而使序列化既快速又小。这也便于动态脚本语言的使用,因为数据及其schema
是完全自描述的。
当 Avro 数据存储在文件中时,它的 schema
也随之存储,以便以后任何程序都可以处理文件。如果读取数据的程序需要不同的模式,这很容易解决,因为这两种模式都存在。
在 RPC 中使用 Avro 时,客户端和服务器在连接握手中交换 schema
。 (可以对此进行优化,使得对于大多数调用,实际上不传输 schema
。)由于客户端和服务器都具有对方的完整模式,因此相同命名字段之间的对应关系,缺少字段,额外字段等都可以轻松地解决.
Avro schema
是用 JSON 定义的。这有助于在已经具有 JSON 库的语言中实现。
与其他系统的比较
Avro 提供类似于 Thrift、Protocol Buffers 等系统的功能。Avro 在以下基本方面与这些系统不同。
- 动态类型:Avro 不需要生成代码。数据总是伴随着一个模式,该模式允许在不生成代码、静态数据类型等的情况下对数据进行全面处理。这有助于构建通用的数据处理系统和语言。
- 无标记数据: 由于在读取数据时存在模式,因此需要对数据进行编码的类型信息要少得多,从而减小了序列化大小。
- 没有手动分配的字段 ID:当
schema
更改时,处理数据时始终存在旧schema
和新schema
,因此可以使用字段名以符号方式解决差异。Avro 序列化
Debezium 连接器在 Kafka Connect 框架中工作,通过生成更改事件记录来捕获数据库中的每个行级更改。 对于每个更改事件记录,Debezium 连接器完成以下操作:
- 应用配置的转换
- 使用配置的 Kafka Connect 转换器将记录的键和值序列化为二进制形式
- 将记录写入正确的 Kafka 主题
Kafka Connect 默认使用 JSON 转换器, 默认包含记录的 schema
,这使得每条记录都非常冗长(每条记录都要存储 schema
),导致占用大量的空间。 可以将以下连接器配置属性设置为 false
,这样会从每条记录中排除详细 schema
信息:
key.converter.schemas.enable
value.converter.schemas.enable
还有一种选择是使用 Apache Avro 转换器序列化记录的键和值。
Avro 模式将表的结构信息存储在 schema registry 中,所以在 kafka 每个记录只需要包含一个很小的 schema
标识符,这使得每条记录更小。
Avro 二进制格式紧凑且高效。 Avro schema 可以确保每个记录具有正确的结构。 Avro schema 的进化机制使 schema
能够进化。 这对于 Debezium 连接器至关重要,它会动态生成每个记录的 schema
,以匹配更改的数据库表的结构。 随着时间的推移,写入相同 Kafka 主题的更改事件记录可能具有相同 schema
的不同版本。 Avro 序列化使更改事件记录的使用者更容易适应不断变化的记录 schema
。
要使用 Apache Avro 序列化,你必须部署一个管理 Avro 消息 schema
及其版本的 schema registry
。 可以使用 Apicurio Registry 或者 Confluent Schema Registry。 我们在这里使用 Confluent Schema Registry。
Confluent Schema Registry 的架构
Confluent Schema Registry 用于存储和检索 schema
Schema Registry 是一个分布式的模式存储层,它使用 Kafka 作为其底层存储机制。 一些关键的设计决策:
- 为每个注册的 schema 分配全局唯一 ID。 分配的 ID 保证单调递增且唯一,但不一定是连续的。
- Kafka 提供持久的后端,并用作 Schema Registry 状态及其包含的模式的预写变更日志。
- Schema Registry 设计为分布式,单主架构,ZooKeeper/Kafka 协调主选(基于配置)。
安装社区版 Confluent Platform(包括 Schema Registry)
安装 Confluent 公钥。该公钥用于在 APT 存储库中对包进行签名。
wget -qO - https://packages.confluent.io/deb/7.1/archive.key | sudo apt-key add -
通过运行以下命令将仓库添加到
/etc/apt/sources.list
:sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.1 stable main"
sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
更新 apt-get 并安装整个 Confluent Platform 平台(Scala 2.13)。
sudo apt-get update && sudo apt-get install confluent-community-2.13
:::info 安装包名称以构建 Kafka 的 Scala 版本结尾。 例如,confluent-platform-2.13 包基于 Scala 2.13。
Confluent Platform 5.5.* 才支持 Scala 2.12 :::配置 Confluent Platform(单机不需要配置)
ZooKeeper
- 编辑 ZooKeeper 配置文件 ```bash vim /etc/kafka/zookeeper.properties
tickTime=2000 dataDir=/var/lib/zookeeper/ clientPort=2181 initLimit=5 syncLimit=2
server.=::
server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24
:::info
此配置适用于三节点集成。 这个配置文件应该在整体中的所有节点上都是相同的。
- `tickTime`、`dataDir` 和 `clientPort` 与典单个服务器配置一样。
- `initLimit` 和 `syncLimit` 控制 ZooKeeper 服务器与当前 leader 初始化所需的时间以及它们与 leader不同步的时间。
`initLimit` 和 `syncLimit` 的值表示为 tickTime 的多少倍,所以需要乘以 tickTime。在此配置中,follower 可能需要 10000 毫秒来初始化,最多可能会出现 4000 毫秒的不同步。
- `myid` 是服务器标识号。 有三台服务器,每台服务器都有一个不同的 myid,分别为 1、2 和 3。
myid 是通过在 dataDir 中创建一个名为 myid 的文件来设置的,此值必须与配置文件中的 myid 值之一匹配。
- `leaderport` 用于 follower 连接到活动的 leader,这个端口应该在所有 ZooKeeper 集合成员之间打开。
- `electionport` 用于在所有的成员之间执行 leader 选举。 这个端口应该在所有 ZooKeeper 集合成员之间打开。
- `autopurge.snapRetainCount` 和 `autopurge.purgeInterval` 设置为每 24 小时清除三个快照以外的所有快照。
:::
- 在 dataDir 目录(/var/lib/zookeeper/)并创建一个名为 myid 的文件。 myid 文件由一行组成,其中包含格式为 <machine-id> 的机器 ID。 当 ZooKeeper 服务器启动时,它通过引用 myid 文件知道它是哪个服务器。 例如,server 1 的 myid 值为 1。
<a name="iATtP"></a>
## Kafka
- 编辑 Kafka 配置文件
```bash
vim /etc/kafka/server.properties
zookeeper.connect=zookeeper:2181
############################# Server Basics #############################
# The ID of the broker. This must be set to a unique integer for each broker.
#broker.id=0
broker.id.generation.enable=true
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
:::info
zookeeper.connect
将所有节点中的 zookeeper.connect 设置为相同的值,连接到同一个 ZooKeeper 集合。
使用以下方法之一为集群中的每个节点配置代理 ID
broker.id.generation.enable
添加 broker.id.generation.enable=true
并注释掉 broker.id
,为集群中的每个节点动态生成代理 ID。
如果要手动设置代理 ID,需要在每个节点上为 broker.id
设置唯一值。
listeners
或者advertised.listeners
配置其他 broker
和客户端如何使用listeners
与 broker
进行通信
:::
Schema Registry
- 编辑 Schema Registry 配置文件 ```bash vim /etc/schema-registry/schema-registry.properties
The address the socket server listens on.
FORMAT:
listeners = listener_name://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:8081
Use this setting to specify the bootstrap servers for your Kafka cluster and it
will be used both for selecting the leader schema registry instance and for storing the data for
registered schemas.
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
:::info
- listeners
Schema Registry 监听地址
- kafkastore.bootstrap.servers
Kafka bootstrap servers 地址
:::
<a name="akHQe"></a>
## Kafka Connect
- 编辑 Kafka Connect 配置文件
```bash
vim /etc/kafka/connect-distributed.properties
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java
:::info
- bootstrap.servers
该参数列出了将要与 Connect 协同工作的 broker 服务器,连接器将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群所有的broker,不过建议至少指定 3 个
- group.id
具有相同 group id 的 worker 属于同一个 Connect 集群。集群的连接器和它们的任务可以运行在任意一个 worker 上。
- key.converter 和 value.converter
Connect 可以处理存储在 Kafka 里的不同格式的数据。这两个参数分别指定了消息的键和值所使用的转换器。默认使用 Kafka 提供的 JSONConverter,当然也可以配置成 Confluent Schema Registry 提供的 AvroConverter。
- key.converter.schema.enable 和 value.converter.schema.enable
设置成 true 或者 false 来指定 JSON 消息是否可以包含 schema。Avro 消息也包含了 schema,不过需要通过 key.converter.schema.registry.url 和value.converter.schema.registry.url 来 指定 Schema Registry 的位置。
- plugin.path
配置插件位置,所有的插件需要放到这个目录下面 :::
:::success 可以通过以下两种方式之一配置使用 Avro 消息
Debezium 连接器配置 ::: :::success
编辑 Kafka Connect 配置文件 :::
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
:::success
配置连接信息 :::
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
:::success
Kafka Connect Worker 配置
编辑 Kafka Connect 配置文件 :::
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
:::danger
注意⚠️:我们使用 Debezium 连接器配置,所以不需要修改 Kafka Connect 配置文件,使用下面的配置
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
:::
启动 Confluent Platform
启动 ZooKeeper
sudo systemctl start confluent-zookeeper
启动 Kafka
sudo systemctl start confluent-kafka
:::info Confluent Platform 使用 confluent-server
Confluent Platform 社区版使用 confluent-kafka :::启动 Schema Registry
sudo systemctl start confluent-schema-registry
:::info 启动成功后可以直接访问 8081 端口
curl localhost:8081
{} :::启动 Kafka Connect
sudo systemctl start confluent-kafka-connect
使用 Debezium 同步 MySQL 数据到 Kafka
:::info MySQL: 商品表 products 和 订单表 orders 将存储在该数据库中 :::
安装并启动 MySQL
- 安装并启动 MySQL(Server version: 8.0.28-0ubuntu0.20.04.3 )
sudo apt update
sudo apt install mysql-server
sudo systemctl enable mysql
sudo systemctl status mysql
:::info 注意⚠️:需要开启 binlog 以及 binlog_format 必须是 row 格式,默认配置即可,不需要做额外操作
log-bin=/var/lib/mysql/mysql-bin.log # 指定 binlog 日志存储位置
binlog_format=ROW # 这里一定是 row 格式
expire-logs-days = 14 # 日志保留时间
max-binlog-size = 500M # 日志滚动大小
:::
在 MySQL 数据库中准备数据
使用 root 用户登录 MySQL
mysql
创建数据库和表 products,orders,并插入数据 ```bash — MySQL CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) );
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products VALUES (default,”scooter”,”Small 2-wheel scooter”), (default,”car battery”,”12V car battery”), (default,”12-pack drill bits”,”12-pack of drill bits with sizes ranging from #40 to #3”), (default,”hammer”,”12oz carpenter’s hammer”), (default,”hammer”,”14oz carpenter’s hammer”), (default,”hammer”,”16oz carpenter’s hammer”), (default,”rocks”,”box of assorted rocks”), (default,”jacket”,”water resistent black wind breaker”), (default,”spare tire”,”24 inch spare tire”);
CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL — Whether order has been placed ) AUTO_INCREMENT = 10001;
INSERT INTO orders VALUES (default, ‘2020-07-30 10:08:22’, ‘Jark’, 50.50, 102, false), (default, ‘2020-07-30 10:11:09’, ‘Sally’, 15.00, 105, false), (default, ‘2020-07-30 12:00:30’, ‘Edward’, 25.25, 106, false);
<a name="UtZQo"></a>
## 创建 Debezium 用于同步数据的用户并授权
- 使用 root 用户登录 MySQL
```bash
mysql
- 创建 Debezium 用于同步数据的用户并授权 ```bash CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘123456’;
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘debezium’;
FLUSH PRIVILEGES;
<a name="DKlJp"></a>
## 下载 Kafka 连接器插件并解压
- 将 debezium-connector-mysql 到 connect-distributed.properties 中 plugin.path 配置的目录下(这里是 /usr/share/java)
```bash
cd /usr/share/java
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.8.1.Final/debezium-connector-mysql-1.8.1.Final-plugin.tar.gz
tar xfz debezium-connector-mysql-1.8.1.Final-plugin.tar.gz && rm debezium-connector-mysql-1.8.1.Final-plugin.tar.gz
下载 Confluent 的 Avro 序列化相关的包并解压
cd /usr/share/java
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.1.0/confluentinc-kafka-connect-avro-converter-7.1.0.zip
unzip confluentinc-kafka-connect-avro-converter-7.1.0.zip && rm confluentinc-kafka-connect-avro-converter-7.1.0.zip
:::info 如果是通过 Confluent Platform 方式安装 confluentinc-kafka-connect-avro-converter-7.1.0 已经在 /usr/share/java 中,无需再次下载 :::
重启 Kafka Connect
sudo systemctl restart confluent-kafka-connect
配置 MySQL 连接信息
将 MySQL 连接信息保存到 dbz-mysql-connector.json 文件中
cat <<EOF > dbz-mysql-avro-connector.json
{
"name":"dbz-mysql-avro-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"localhost",
"database.port":"3306",
"database.user":"debezium",
"database.password":"123456",
"database.server.id":"132179",
"database.server.name":"mysql-avro",
"database.include.list":"mydb",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.topic":"dbhistory.mysql-avro",
"database.allowPublicKeyRetrieval":"true",
"decimal.handling.mode":"double",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
EOF
:::info
name:标识连接器的名称
- connector.class:对应数据库类
- tasks.max:默认1
- database.hostname:数据库 ip
- database.port:数据库端口
- database.user:数据库登录名
- database.password:数据库密码
- database.server.id:数据库 id,标识当前库,不重复就行
- database.server.name:给数据库取别名
- database.include.list:类似白名单,里面的库可以监控到,不在里面监控不到,多库逗号分隔,支持正则匹配
- database.history.kafka.bootstrap.servers:保存表 DDL 相关信息的 kafka 地址
- database.history.kafka.topic:表 DDL 相关信息会保存在这个 topic 里面
- decimal.handling.mode:当处理 decimal和 Int 类型时,默认是二进制显示,我们改为 double 显示
- snapshot.mode:快照模式,这个需要具体情况,具体分析,因为我只需要实时数据,不需要历史数据,所以设置为 schema_only
- tombstones.on.delete:默认是 True,当我们删除记录的时候,会产生两天数据,第二条为NULL,但是我们不希望出现NULL,所以设置为 False
- table.include.list:类似白名单,里面的表可以监控到,不在里面监控不到,多表逗号分隔,支持正则匹配 :::
:::danger
MySQL v8 报错 Unable to connect: Public Key Retrieval is not allowed,需要添加配置项
“database.allowPublicKeyRetrieval”:”true”,
https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
:::
:::warning Decimal 格式数据未完全转换过来
插入数据 :::
INSERT INTO debeziumdb.data_type_test
(id, varchar_data, bigint_data, float_data, double_date, decimal_data, date_data, time_data, datetime_data, timestamp_data)
VALUES(1, 'www', 1, 1.10000002, 1.11, 5000.36, '2020-02-02', '12:12:10', '2020-02-02 12:12:10.0', NULL);
:::warning
使用 Confluent 的 Avro 序列化消费结果为 :::
{"before": null, "after": {"id": 1, "varchar_data": "www", "bigint_data": 1, "float_data": 1.100000023841858, "double_date": 1.11, "decimal_data": "\u0007¡D", "date_data": 18294, "time_data": 43930000000, "datetime_data": 1580645530000, "timestamp_data": "1970-01-01T00:00:00Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "dbz.mysql", "ts_ms": 0, "snapshot": "last", "db": "debeziumdb", "table": "data_type_test", "server_id": 0, "gtid": null, "file": "MS-SJNKFHHAUOQV-bin.000003", "pos": 16244421, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1588060753486, "transaction": null}
:::warning 需要添加配置项
“decimal.handling.mode”: “string”
或者
“decimal.handling.mode”: “double”
其中 double 类型保存两位精度 :::启动 schema-registry 和 kafka-connect
sudo systemctl start confluent-schema-registry
sudo systemctl start confluent-kafka-connect
查看 kafka-connect 日志
journalctl -f -u confluent-kafka-connect
配置 MySQL 连接信息
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @dbz-mysql-avro-connector.json
查看 Kafka 的 Topic 列表
kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-status
dbhistory.mysql-avro
mysql-avro
mysql-avro.mydb.orders
mysql-avro.mydb.products
Kafka 主题名称使用
. . 这样的格式。 查看 Kafka 的 Topic 信息,需要使用 kafka-avro-console-consumer,否则看到的是二进制格式
kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql-avro.mydb.orders | jq
{
"before": null,
"after": {
"mysql_avro.mydb.orders.Value": {
"order_id": 10001,
"order_date": 1596103702000,
"customer_name": "Jark",
"price": 50.5,
"product_id": 102,
"order_status": 0
}
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "mysql-avro",
"ts_ms": 1649338430358,
"snapshot": {
"string": "true"
},
"db": "mydb",
"sequence": null,
"table": {
"string": "orders"
},
"server_id": 0,
"gtid": null,
"file": "binlog.000010",
"pos": 157,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": {
"long": 1649338430363
},
"transaction": null
}
{
"before": null,
"after": {
"mysql_avro.mydb.orders.Value": {
"order_id": 10002,
"order_date": 1596103869000,
"customer_name": "Sally",
"price": 15,
"product_id": 105,
"order_status": 0
}
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "mysql-avro",
"ts_ms": 1649338430366,
"snapshot": {
"string": "true"
},
"db": "mydb",
"sequence": null,
"table": {
"string": "orders"
},
"server_id": 0,
"gtid": null,
"file": "binlog.000010",
"pos": 157,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": {
"long": 1649338430366
},
"transaction": null
}
{
"before": null,
"after": {
"mysql_avro.mydb.orders.Value": {
"order_id": 10003,
"order_date": 1596110430000,
"customer_name": "Edward",
"price": 25.25,
"product_id": 106,
"order_status": 0
}
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "mysql-avro",
"ts_ms": 1649338430367,
"snapshot": {
"string": "true"
},
"db": "mydb",
"sequence": null,
"table": {
"string": "orders"
},
"server_id": 0,
"gtid": null,
"file": "binlog.000010",
"pos": 157,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": {
"long": 1649338430367
},
"transaction": null
}
查看 Kafka 中保存的
schema
信息,其中可以看到 key 和 value 的数据类型信息kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic _schemas | jq
null
null
{
"subject": "mysql-avro-key",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.name\":\"io.debezium.connector.mysql.SchemaChangeKey\"}",
"deleted": false
}
{
"subject": "mysql-avro-value",
"version": 1,
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"SchemaChangeValue\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"databaseName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schemaName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ddl\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tableChanges\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Change\",\"namespace\":\"io.debezium.connector.schema\",\"fields\":[{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"table\",\"type\":{\"type\":\"record\",\"name\":\"Table\",\"fields\":[{\"name\":\"defaultCharsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"primaryKeyColumnNames\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"columns\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Column\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"jdbcType\",\"type\":\"int\"},{\"name\":\"nativeType\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"typeName\",\"type\":\"string\"},{\"name\":\"typeExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"charsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"scale\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"position\",\"type\":\"int\"},{\"name\":\"optional\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"autoIncremented\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"generated\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.schema.Column\"}}},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.schema.Table\"}}],\"connect.name\":\"io.debezium.connector.schema.Change\"}}}],\"connect.name\":\"io.debezium.connector.mysql.SchemaChangeValue\"}",
"deleted": false
}
{
"subject": "mysql-avro.mydb.orders-key",
"version": 1,
"id": 3,
"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysql_avro.mydb.orders\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"}],\"connect.name\":\"mysql_avro.mydb.orders.Key\"}",
"deleted": false
}
{
"subject": "mysql-avro.mydb.orders-value",
"version": 1,
"id": 4,
"schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"mysql_avro.mydb.orders\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"order_date\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Timestamp\"}},{\"name\":\"customer_name\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"order_status\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}}],\"connect.name\":\"mysql_avro.mydb.orders.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.orders.Envelope\"}",
"deleted": false
}
{
"subject": "mysql-avro.mydb.products-key",
"version": 1,
"id": 5,
"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysql_avro.mydb.products\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"mysql_avro.mydb.products.Key\"}",
"deleted": false
}
{
"subject": "mysql-avro.mydb.products-value",
"version": 1,
"id": 6,
"schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"mysql_avro.mydb.products\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.products.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.products.Envelope\"}",
"deleted": false
}
清除环境
删除 connector
curl -X DELETE http://localhost:8083/connectors/dbz-mysql-avro-connector
停止 Kafka Connect
sudo systemctl stop confluent-schema-registry
sudo systemctl stop confluent-kafka-connect
删除 topic ```bash kafka-topics —bootstrap-server localhost:9092 —delete —topic dbhistory.mysql-avro kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro.mydb.orders kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro.mydb.products kafka-topics —bootstrap-server localhost:9092 —delete —topic _schemas
kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-configs kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-offsets kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-status
kafka-topics —bootstrap-server localhost:9092 —list
<a name="RgOY0"></a>
# 使用 Flink SQL
<a name="aKv0u"></a>
## 安装并启动 Flink
- 下载 [Flink 1.13.6](https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz) (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.13.6
```bash
wget https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar -xvf flink-1.13.6-bin-scala_2.12.tgz
- 下载下面列出的依赖包,并将它们放到目录 flink-1.13.6/lib/ 下 ```json cd flink-1.13.6/lib/
- [flink-sql-connector-kafka_2.12-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar)
- [flink-sql-avro-confluent-registry-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.13.6/flink-sql-avro-confluent-registry-1.13.6.jar)
- 由于schema-registry 已经使用了 8081 ,所以需要修改 Flink 配置,将 rest 端口改为 18081
```bash
vim ~/flink-1.13.6/conf/flink-conf.yaml
rest.port: 18081
- 启动 Flink 集群 ```json cd ~/flink-1.13.6
bin/start-cluster.sh
<a name="XPA3w"></a>
## 在 Flink SQL CLI 中使用 Flink DDL 创建 Source 表
- 启动 sql-client
```json
bin/sql-client.sh
- Flink SQL 创建 Source 表 ```sql CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql-avro.mydb.products’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘debezium-avro-confluent.schema-registry.url’ = ‘http://localhost:8081‘, ‘format’ = ‘debezium-avro-confluent’ );
CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), — debezium MySQL 插件会将 DATETIME 转换成 long customer_name STRING, price DOUBLE, — debezium MySQL 插件会将 DECIMAL(10, 5) 转换成 double product_id INT, order_status INT, — debezium MySQL 插件会将 BOOLEAN 转换成 int PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql-avro.mydb.orders’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘debezium-avro-confluent.schema-registry.url’ = ‘http://localhost:8081‘, ‘format’ = ‘debezium-avro-confluent’ );
:::info
使用 Avro 编码消息,需要如下配置
'format' = 'debezium-avro-confluent'<br />'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8081',
:::
:::warning
Avro 中 DATETIME 转换成 long<br />Avro 中 DECIMAL(10, 5) 转换成 double<br />Avro 中 BOOLEAN 转换成 int
:::
- Flink SQL 提交任务
```sql
select * from products;
select * from orders;
数据类型映射
目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema
Flink SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
CHAR / VARCHAR / STRING | string | |
BOOLEAN | boolean | |
BINARY / VARBINARY | bytes | |
DECIMAL | fixed | decimal |
TINYINT | int | |
SMALLINT | int | |
INT | int | |
BIGINT | long | |
FLOAT | float | |
DOUBLE | double | |
DATE | int | date |
TIME | int | time-millis |
TIMESTAMP | long | timestamp-millis |
ARRAY | array | |
MAP (key 必须是 string/char/varchar 类型) |
map | |
MULTISET (元素必须是 string/char/varchar 类型) |
map | |
ROW | record |
遇到的问题
ERROR The retention policy of the schema topic _schemas is incorrect
问题现象
[2018-01-11 14:37:49,682] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2018-01-11 14:37:49,773] ERROR The retention policy of the schema topic _schemas is incorrect. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your schemas after a week. Refer to Kafka documentation for more details on cleanup policies (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2018-01-11 14:37:49,780] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2018-01-11 14:37:49,783] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)
java.lang.NullPointerException
at io.confluent.kafka.schemaregistry.storage.KafkaStore.close(KafkaStore.java:366)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.close(KafkaSchemaRegistry.java:720)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.onShutdown(SchemaRegistryRestApplication.java:111)
解决方案
在重新启动 kafka 集群或 schema-registry 经常重启后有时会发生。执行下面命令手动修复
kafka-topics --alter --config cleanup.policy=compact --topic _schemas --bootstrap-server
参考文档 Manual Install using Systemd on Ubuntu and Debian ERROR The retention policy of the schema topic _schemas is incorrect
[
](https://blog.csdn.net/OldDirverHelpMe/article/details/107881170)