Avro 介绍

Apache Avro™ 是一个数据序列化系统。

Avro 提供:

  • 丰富的数据结构。
  • 一种紧凑、快速的二进制数据格式。
  • 一个容器文件,用于存储持久数据。
  • 远程过程调用 (RPC)。
  • 与动态语言的简单集成。不需要生成代码来读取或写入数据文件,也不需要使用或实现 RPC 协议。代码生成是一种可选的优化,静态类型语言才值得实现。

    schema

    Avro 依赖于 schema。读取 Avro 数据时,写入时使用的 schema 始终存在。这允许不需要为每个值都写入 schema 的情况下写入每个数据,从而使序列化既快速又小。这也便于动态脚本语言的使用,因为数据及其 schema 是完全自描述的。

当 Avro 数据存储在文件中时,它的 schema 也随之存储,以便以后任何程序都可以处理文件。如果读取数据的程序需要不同的模式,这很容易解决,因为这两种模式都存在。

在 RPC 中使用 Avro 时,客户端和服务器在连接握手中交换 schema。 (可以对此进行优化,使得对于大多数调用,实际上不传输 schema。)由于客户端和服务器都具有对方的完整模式,因此相同命名字段之间的对应关系,缺少字段,额外字段等都可以轻松地解决.

Avro schema是用 JSON 定义的。这有助于在已经具有 JSON 库的语言中实现。

与其他系统的比较

Avro 提供类似于 Thrift、Protocol Buffers 等系统的功能。Avro 在以下基本方面与这些系统不同。

  • 动态类型:Avro 不需要生成代码。数据总是伴随着一个模式,该模式允许在不生成代码、静态数据类型等的情况下对数据进行全面处理。这有助于构建通用的数据处理系统和语言。
  • 无标记数据: 由于在读取数据时存在模式,因此需要对数据进行编码的类型信息要少得多,从而减小了序列化大小。
  • 没有手动分配的字段 ID:当 schema 更改时,处理数据时始终存在旧 schema 和新 schema,因此可以使用字段名以符号方式解决差异。

    Avro 序列化

    Debezium 连接器在 Kafka Connect 框架中工作,通过生成更改事件记录来捕获数据库中的每个行级更改。 对于每个更改事件记录,Debezium 连接器完成以下操作:
  1. 应用配置的转换
  2. 使用配置的 Kafka Connect 转换器将记录的键和值序列化为二进制形式
  3. 将记录写入正确的 Kafka 主题

Kafka Connect 默认使用 JSON 转换器, 默认包含记录的 schema,这使得每条记录都非常冗长(每条记录都要存储 schema),导致占用大量的空间。 可以将以下连接器配置属性设置为 false,这样会从每条记录中排除详细 schema 信息:

  • key.converter.schemas.enable
  • value.converter.schemas.enable

还有一种选择是使用 Apache Avro 转换器序列化记录的键和值。

Avro 模式将表的结构信息存储在 schema registry 中,所以在 kafka 每个记录只需要包含一个很小的 schema 标识符,这使得每条记录更小。

Avro 二进制格式紧凑且高效。 Avro schema 可以确保每个记录具有正确的结构。 Avro schema 的进化机制使 schema 能够进化。 这对于 Debezium 连接器至关重要,它会动态生成每个记录的 schema,以匹配更改的数据库表的结构。 随着时间的推移,写入相同 Kafka 主题的更改事件记录可能具有相同 schema 的不同版本。 Avro 序列化使更改事件记录的使用者更容易适应不断变化的记录 schema

要使用 Apache Avro 序列化,你必须部署一个管理 Avro 消息 schema 及其版本的 schema registry。 可以使用 Apicurio Registry 或者 Confluent Schema Registry。 我们在这里使用 Confluent Schema Registry。

Confluent Schema Registry 的架构

image.png
Confluent Schema Registry 用于存储和检索 schema
Schema Registry 是一个分布式的模式存储层,它使用 Kafka 作为其底层存储机制。 一些关键的设计决策:

  • 为每个注册的 schema 分配全局唯一 ID。 分配的 ID 保证单调递增且唯一,但不一定是连续的。
  • Kafka 提供持久的后端,并用作 Schema Registry 状态及其包含的模式的预写变更日志。
  • Schema Registry 设计为分布式,单主架构,ZooKeeper/Kafka 协调主选(基于配置)。

    安装社区版 Confluent Platform(包括 Schema Registry)

  1. 安装 Confluent 公钥。该公钥用于在 APT 存储库中对包进行签名。

    1. wget -qO - https://packages.confluent.io/deb/7.1/archive.key | sudo apt-key add -
  2. 通过运行以下命令将仓库添加到 /etc/apt/sources.list

    1. sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.1 stable main"
    2. sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
  3. 更新 apt-get 并安装整个 Confluent Platform 平台(Scala 2.13)。

    1. sudo apt-get update && sudo apt-get install confluent-community-2.13

    :::info 安装包名称以构建 Kafka 的 Scala 版本结尾。 例如,confluent-platform-2.13 包基于 Scala 2.13。
    Confluent Platform 5.5.* 才支持 Scala 2.12 :::

    配置 Confluent Platform(单机不需要配置)

    ZooKeeper

  • 编辑 ZooKeeper 配置文件 ```bash vim /etc/kafka/zookeeper.properties

tickTime=2000 dataDir=/var/lib/zookeeper/ clientPort=2181 initLimit=5 syncLimit=2

server.=::

server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24

  1. :::info
  2. 此配置适用于三节点集成。 这个配置文件应该在整体中的所有节点上都是相同的。
  3. - `tickTime``dataDir` `clientPort` 与典单个服务器配置一样。
  4. - `initLimit` `syncLimit` 控制 ZooKeeper 服务器与当前 leader 初始化所需的时间以及它们与 leader不同步的时间。
  5. `initLimit` `syncLimit` 的值表示为 tickTime 的多少倍,所以需要乘以 tickTime。在此配置中,follower 可能需要 10000 毫秒来初始化,最多可能会出现 4000 毫秒的不同步。
  6. - `myid` 是服务器标识号。 有三台服务器,每台服务器都有一个不同的 myid,分别为 12 3
  7. myid 是通过在 dataDir 中创建一个名为 myid 的文件来设置的,此值必须与配置文件中的 myid 值之一匹配。
  8. - `leaderport` 用于 follower 连接到活动的 leader,这个端口应该在所有 ZooKeeper 集合成员之间打开。
  9. - `electionport` 用于在所有的成员之间执行 leader 选举。 这个端口应该在所有 ZooKeeper 集合成员之间打开。
  10. - `autopurge.snapRetainCount` `autopurge.purgeInterval` 设置为每 24 小时清除三个快照以外的所有快照。
  11. :::
  12. - dataDir 目录(/var/lib/zookeeper/)并创建一个名为 myid 的文件。 myid 文件由一行组成,其中包含格式为 <machine-id> 的机器 ID ZooKeeper 服务器启动时,它通过引用 myid 文件知道它是哪个服务器。 例如,server 1 myid 值为 1
  13. <a name="iATtP"></a>
  14. ## Kafka
  15. - 编辑 Kafka 配置文件
  16. ```bash
  17. vim /etc/kafka/server.properties
  18. zookeeper.connect=zookeeper:2181
  19. ############################# Server Basics #############################
  20. # The ID of the broker. This must be set to a unique integer for each broker.
  21. #broker.id=0
  22. broker.id.generation.enable=true
  23. # The address the socket server listens on. It will get the value returned from
  24. # java.net.InetAddress.getCanonicalHostName() if not configured.
  25. # FORMAT:
  26. # listeners = listener_name://host_name:port
  27. # EXAMPLE:
  28. # listeners = PLAINTEXT://your.host.name:9092
  29. #listeners=PLAINTEXT://:9092

:::info

  • zookeeper.connect

将所有节点中的 zookeeper.connect 设置为相同的值,连接到同一个 ZooKeeper 集合。
使用以下方法之一为集群中的每个节点配置代理 ID

  • broker.id.generation.enable

添加 broker.id.generation.enable=true 并注释掉 broker.id,为集群中的每个节点动态生成代理 ID。
如果要手动设置代理 ID,需要在每个节点上为 broker.id 设置唯一值。

  • listeners或者 advertised.listeners

配置其他 broker 和客户端如何使用listenersbroker 进行通信 :::

Schema Registry

  • 编辑 Schema Registry 配置文件 ```bash vim /etc/schema-registry/schema-registry.properties

The address the socket server listens on.

FORMAT:

listeners = listener_name://host_name:port

EXAMPLE:

listeners = PLAINTEXT://your.host.name:9092

listeners=http://0.0.0.0:8081

Use this setting to specify the bootstrap servers for your Kafka cluster and it

will be used both for selecting the leader schema registry instance and for storing the data for

registered schemas.

kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092

  1. :::info
  2. - listeners
  3. Schema Registry 监听地址
  4. - kafkastore.bootstrap.servers
  5. Kafka bootstrap servers 地址
  6. :::
  7. <a name="akHQe"></a>
  8. ## Kafka Connect
  9. - 编辑 Kafka Connect 配置文件
  10. ```bash
  11. vim /etc/kafka/connect-distributed.properties
  12. # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
  13. bootstrap.servers=localhost:9092
  14. # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
  15. group.id=connect-cluster
  16. # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
  17. # need to configure these based on the format they want their data in when loaded from or stored into Kafka
  18. key.converter=org.apache.kafka.connect.json.JsonConverter
  19. value.converter=org.apache.kafka.connect.json.JsonConverter
  20. # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
  21. # it to
  22. key.converter.schemas.enable=true
  23. value.converter.schemas.enable=true
  24. # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
  25. # (connectors, converters, transformations). The list should consist of top level directories that include
  26. # any combination of:
  27. # a) directories immediately containing jars with plugins and their dependencies
  28. # b) uber-jars with plugins and their dependencies
  29. # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
  30. # Examples:
  31. # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
  32. plugin.path=/usr/share/java

:::info

  • bootstrap.servers

该参数列出了将要与 Connect 协同工作的 broker 服务器,连接器将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群所有的broker,不过建议至少指定 3 个

  • group.id

具有相同 group id 的 worker 属于同一个 Connect 集群。集群的连接器和它们的任务可以运行在任意一个 worker 上。

  • key.converter 和 value.converter

Connect 可以处理存储在 Kafka 里的不同格式的数据。这两个参数分别指定了消息的键和值所使用的转换器。默认使用 Kafka 提供的 JSONConverter,当然也可以配置成 Confluent Schema Registry 提供的 AvroConverter。

  • key.converter.schema.enable 和 value.converter.schema.enable

设置成 true 或者 false 来指定 JSON 消息是否可以包含 schema。Avro 消息也包含了 schema,不过需要通过 key.converter.schema.registry.url 和value.converter.schema.registry.url 来 指定 Schema Registry 的位置。

  • plugin.path

配置插件位置,所有的插件需要放到这个目录下面 :::

:::success 可以通过以下两种方式之一配置使用 Avro 消息

  • Debezium 连接器配置 ::: :::success

    • 编辑 Kafka Connect 配置文件 :::

      1. key.converter=org.apache.kafka.connect.json.JsonConverter
      2. value.converter=org.apache.kafka.connect.json.JsonConverter

      :::success

    • 配置连接信息 :::

      1. "key.converter": "io.confluent.connect.avro.AvroConverter",
      2. "value.converter": "io.confluent.connect.avro.AvroConverter",
      3. "key.converter.schema.registry.url": "http://schema-registry:8081",
      4. "value.converter.schema.registry.url": "http://schema-registry:8081"

      :::success

  • Kafka Connect Worker 配置

编辑 Kafka Connect 配置文件 :::

  1. key.converter=io.confluent.connect.avro.AvroConverter
  2. value.converter=io.confluent.connect.avro.AvroConverter
  3. key.converter.schema.registry.url=http://schema-registry:8081
  4. value.converter.schema.registry.url=http://schema-registry:8081

:::danger 注意⚠️:我们使用 Debezium 连接器配置,所以不需要修改 Kafka Connect 配置文件,使用下面的配置
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter :::

启动 Confluent Platform

  1. 启动 ZooKeeper

    1. sudo systemctl start confluent-zookeeper
  2. 启动 Kafka

    1. sudo systemctl start confluent-kafka

    :::info Confluent Platform 使用 confluent-server
    Confluent Platform 社区版使用 confluent-kafka :::

  3. 启动 Schema Registry

    1. sudo systemctl start confluent-schema-registry

    :::info 启动成功后可以直接访问 8081 端口
    curl localhost:8081
    {} :::

  4. 启动 Kafka Connect

    1. sudo systemctl start confluent-kafka-connect

    使用 Debezium 同步 MySQL 数据到 Kafka

    :::info MySQL: 商品表 products 和 订单表 orders 将存储在该数据库中 :::

    安装并启动 MySQL

  • 安装并启动 MySQL(Server version: 8.0.28-0ubuntu0.20.04.3 )
    1. sudo apt update
    2. sudo apt install mysql-server
    3. sudo systemctl enable mysql
    4. sudo systemctl status mysql

    参考 如何在 Ubuntu 20.04 上安装 MySQL

:::info 注意⚠️:需要开启 binlog 以及 binlog_format 必须是 row 格式,默认配置即可,不需要做额外操作

log-bin=/var/lib/mysql/mysql-bin.log # 指定 binlog 日志存储位置
binlog_format=ROW # 这里一定是 row 格式
expire-logs-days = 14 # 日志保留时间
max-binlog-size = 500M # 日志滚动大小 :::

在 MySQL 数据库中准备数据

  • 使用 root 用户登录 MySQL

    1. mysql
  • 创建数据库和表 products,orders,并插入数据 ```bash — MySQL CREATE DATABASE mydb;

USE mydb;

CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) );

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products VALUES (default,”scooter”,”Small 2-wheel scooter”), (default,”car battery”,”12V car battery”), (default,”12-pack drill bits”,”12-pack of drill bits with sizes ranging from #40 to #3”), (default,”hammer”,”12oz carpenter’s hammer”), (default,”hammer”,”14oz carpenter’s hammer”), (default,”hammer”,”16oz carpenter’s hammer”), (default,”rocks”,”box of assorted rocks”), (default,”jacket”,”water resistent black wind breaker”), (default,”spare tire”,”24 inch spare tire”);

CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL — Whether order has been placed ) AUTO_INCREMENT = 10001;

INSERT INTO orders VALUES (default, ‘2020-07-30 10:08:22’, ‘Jark’, 50.50, 102, false), (default, ‘2020-07-30 10:11:09’, ‘Sally’, 15.00, 105, false), (default, ‘2020-07-30 12:00:30’, ‘Edward’, 25.25, 106, false);

  1. <a name="UtZQo"></a>
  2. ## 创建 Debezium 用于同步数据的用户并授权
  3. - 使用 root 用户登录 MySQL
  4. ```bash
  5. mysql
  • 创建 Debezium 用于同步数据的用户并授权 ```bash CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘123456’;

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘debezium’;

FLUSH PRIVILEGES;

  1. <a name="DKlJp"></a>
  2. ## 下载 Kafka 连接器插件并解压
  3. - 将 debezium-connector-mysql 到 connect-distributed.properties 中 plugin.path 配置的目录下(这里是 /usr/share/java)
  4. ```bash
  5. cd /usr/share/java
  6. wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.8.1.Final/debezium-connector-mysql-1.8.1.Final-plugin.tar.gz
  7. tar xfz debezium-connector-mysql-1.8.1.Final-plugin.tar.gz && rm debezium-connector-mysql-1.8.1.Final-plugin.tar.gz

下载 Confluent 的 Avro 序列化相关的包并解压

  1. cd /usr/share/java
  2. wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.1.0/confluentinc-kafka-connect-avro-converter-7.1.0.zip
  3. unzip confluentinc-kafka-connect-avro-converter-7.1.0.zip && rm confluentinc-kafka-connect-avro-converter-7.1.0.zip

:::info 如果是通过 Confluent Platform 方式安装 confluentinc-kafka-connect-avro-converter-7.1.0 已经在 /usr/share/java 中,无需再次下载 :::

重启 Kafka Connect

  1. sudo systemctl restart confluent-kafka-connect

配置 MySQL 连接信息

  • 将 MySQL 连接信息保存到 dbz-mysql-connector.json 文件中

    1. cat <<EOF > dbz-mysql-avro-connector.json
    2. {
    3. "name":"dbz-mysql-avro-connector",
    4. "config":{
    5. "connector.class":"io.debezium.connector.mysql.MySqlConnector",
    6. "tasks.max":"1",
    7. "database.hostname":"localhost",
    8. "database.port":"3306",
    9. "database.user":"debezium",
    10. "database.password":"123456",
    11. "database.server.id":"132179",
    12. "database.server.name":"mysql-avro",
    13. "database.include.list":"mydb",
    14. "database.history.kafka.bootstrap.servers":"localhost:9092",
    15. "database.history.kafka.topic":"dbhistory.mysql-avro",
    16. "database.allowPublicKeyRetrieval":"true",
    17. "decimal.handling.mode":"double",
    18. "key.converter": "io.confluent.connect.avro.AvroConverter",
    19. "value.converter": "io.confluent.connect.avro.AvroConverter",
    20. "key.converter.schema.registry.url": "http://localhost:8081",
    21. "value.converter.schema.registry.url": "http://localhost:8081"
    22. }
    23. }
    24. EOF

    :::info

  • name:标识连接器的名称

  • connector.class:对应数据库类
  • tasks.max:默认1
  • database.hostname:数据库 ip
  • database.port:数据库端口
  • database.user:数据库登录名
  • database.password:数据库密码
  • database.server.id:数据库 id,标识当前库,不重复就行
  • database.server.name:给数据库取别名
  • database.include.list:类似白名单,里面的库可以监控到,不在里面监控不到,多库逗号分隔,支持正则匹配
  • database.history.kafka.bootstrap.servers:保存表 DDL 相关信息的 kafka 地址
  • database.history.kafka.topic:表 DDL 相关信息会保存在这个 topic 里面
  • decimal.handling.mode:当处理 decimal和 Int 类型时,默认是二进制显示,我们改为 double 显示
  • snapshot.mode:快照模式,这个需要具体情况,具体分析,因为我只需要实时数据,不需要历史数据,所以设置为 schema_only
  • tombstones.on.delete:默认是 True,当我们删除记录的时候,会产生两天数据,第二条为NULL,但是我们不希望出现NULL,所以设置为 False
  • table.include.list:类似白名单,里面的表可以监控到,不在里面监控不到,多表逗号分隔,支持正则匹配 :::

:::danger MySQL v8 报错 Unable to connect: Public Key Retrieval is not allowed,需要添加配置项
“database.allowPublicKeyRetrieval”:”true”,
https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/ :::

:::warning Decimal 格式数据未完全转换过来

  • 插入数据 :::

    1. INSERT INTO debeziumdb.data_type_test
    2. (id, varchar_data, bigint_data, float_data, double_date, decimal_data, date_data, time_data, datetime_data, timestamp_data)
    3. VALUES(1, 'www', 1, 1.10000002, 1.11, 5000.36, '2020-02-02', '12:12:10', '2020-02-02 12:12:10.0', NULL);

    :::warning

  • 使用 Confluent 的 Avro 序列化消费结果为 :::

    1. {"before": null, "after": {"id": 1, "varchar_data": "www", "bigint_data": 1, "float_data": 1.100000023841858, "double_date": 1.11, "decimal_data": "\u0007¡D", "date_data": 18294, "time_data": 43930000000, "datetime_data": 1580645530000, "timestamp_data": "1970-01-01T00:00:00Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "dbz.mysql", "ts_ms": 0, "snapshot": "last", "db": "debeziumdb", "table": "data_type_test", "server_id": 0, "gtid": null, "file": "MS-SJNKFHHAUOQV-bin.000003", "pos": 16244421, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1588060753486, "transaction": null}

    :::warning 需要添加配置项
    “decimal.handling.mode”: “string”
    或者
    “decimal.handling.mode”: “double”
    其中 double 类型保存两位精度 :::

  • 启动 schema-registry 和 kafka-connect

    1. sudo systemctl start confluent-schema-registry
    2. sudo systemctl start confluent-kafka-connect
  • 查看 kafka-connect 日志

    1. journalctl -f -u confluent-kafka-connect
  • 配置 MySQL 连接信息

    1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @dbz-mysql-avro-connector.json
  • 查看 Kafka 的 Topic 列表

    1. kafka-topics --bootstrap-server localhost:9092 --list
    2. __consumer_offsets
    3. _schemas
    4. connect-configs
    5. connect-offsets
    6. connect-status
    7. dbhistory.mysql-avro
    8. mysql-avro
    9. mysql-avro.mydb.orders
    10. mysql-avro.mydb.products

    Kafka 主题名称使用 .. 这样的格式。

  • 查看 Kafka 的 Topic 信息,需要使用 kafka-avro-console-consumer,否则看到的是二进制格式

    1. kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql-avro.mydb.orders | jq
    2. {
    3. "before": null,
    4. "after": {
    5. "mysql_avro.mydb.orders.Value": {
    6. "order_id": 10001,
    7. "order_date": 1596103702000,
    8. "customer_name": "Jark",
    9. "price": 50.5,
    10. "product_id": 102,
    11. "order_status": 0
    12. }
    13. },
    14. "source": {
    15. "version": "1.8.1.Final",
    16. "connector": "mysql",
    17. "name": "mysql-avro",
    18. "ts_ms": 1649338430358,
    19. "snapshot": {
    20. "string": "true"
    21. },
    22. "db": "mydb",
    23. "sequence": null,
    24. "table": {
    25. "string": "orders"
    26. },
    27. "server_id": 0,
    28. "gtid": null,
    29. "file": "binlog.000010",
    30. "pos": 157,
    31. "row": 0,
    32. "thread": null,
    33. "query": null
    34. },
    35. "op": "r",
    36. "ts_ms": {
    37. "long": 1649338430363
    38. },
    39. "transaction": null
    40. }
    41. {
    42. "before": null,
    43. "after": {
    44. "mysql_avro.mydb.orders.Value": {
    45. "order_id": 10002,
    46. "order_date": 1596103869000,
    47. "customer_name": "Sally",
    48. "price": 15,
    49. "product_id": 105,
    50. "order_status": 0
    51. }
    52. },
    53. "source": {
    54. "version": "1.8.1.Final",
    55. "connector": "mysql",
    56. "name": "mysql-avro",
    57. "ts_ms": 1649338430366,
    58. "snapshot": {
    59. "string": "true"
    60. },
    61. "db": "mydb",
    62. "sequence": null,
    63. "table": {
    64. "string": "orders"
    65. },
    66. "server_id": 0,
    67. "gtid": null,
    68. "file": "binlog.000010",
    69. "pos": 157,
    70. "row": 0,
    71. "thread": null,
    72. "query": null
    73. },
    74. "op": "r",
    75. "ts_ms": {
    76. "long": 1649338430366
    77. },
    78. "transaction": null
    79. }
    80. {
    81. "before": null,
    82. "after": {
    83. "mysql_avro.mydb.orders.Value": {
    84. "order_id": 10003,
    85. "order_date": 1596110430000,
    86. "customer_name": "Edward",
    87. "price": 25.25,
    88. "product_id": 106,
    89. "order_status": 0
    90. }
    91. },
    92. "source": {
    93. "version": "1.8.1.Final",
    94. "connector": "mysql",
    95. "name": "mysql-avro",
    96. "ts_ms": 1649338430367,
    97. "snapshot": {
    98. "string": "true"
    99. },
    100. "db": "mydb",
    101. "sequence": null,
    102. "table": {
    103. "string": "orders"
    104. },
    105. "server_id": 0,
    106. "gtid": null,
    107. "file": "binlog.000010",
    108. "pos": 157,
    109. "row": 0,
    110. "thread": null,
    111. "query": null
    112. },
    113. "op": "r",
    114. "ts_ms": {
    115. "long": 1649338430367
    116. },
    117. "transaction": null
    118. }
  • 查看 Kafka 中保存的 schema 信息,其中可以看到 key 和 value 的数据类型信息

    1. kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic _schemas | jq
    2. null
    3. null
    4. {
    5. "subject": "mysql-avro-key",
    6. "version": 1,
    7. "id": 1,
    8. "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.name\":\"io.debezium.connector.mysql.SchemaChangeKey\"}",
    9. "deleted": false
    10. }
    11. {
    12. "subject": "mysql-avro-value",
    13. "version": 1,
    14. "id": 2,
    15. "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeValue\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"databaseName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schemaName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ddl\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tableChanges\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Change\",\"namespace\":\"io.debezium.connector.schema\",\"fields\":[{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"table\",\"type\":{\"type\":\"record\",\"name\":\"Table\",\"fields\":[{\"name\":\"defaultCharsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"primaryKeyColumnNames\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"columns\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Column\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"jdbcType\",\"type\":\"int\"},{\"name\":\"nativeType\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"typeName\",\"type\":\"string\"},{\"name\":\"typeExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"charsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"scale\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"position\",\"type\":\"int\"},{\"name\":\"optional\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"autoIncremented\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"generated\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.schema.Column\"}}},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.schema.Table\"}}],\"connect.name\":\"io.debezium.connector.schema.Change\"}}}],\"connect.name\":\"io.debezium.connector.mysql.SchemaChangeValue\"}",
    16. "deleted": false
    17. }
    18. {
    19. "subject": "mysql-avro.mydb.orders-key",
    20. "version": 1,
    21. "id": 3,
    22. "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysql_avro.mydb.orders\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"}],\"connect.name\":\"mysql_avro.mydb.orders.Key\"}",
    23. "deleted": false
    24. }
    25. {
    26. "subject": "mysql-avro.mydb.orders-value",
    27. "version": 1,
    28. "id": 4,
    29. "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"mysql_avro.mydb.orders\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"order_date\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Timestamp\"}},{\"name\":\"customer_name\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"order_status\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}}],\"connect.name\":\"mysql_avro.mydb.orders.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.orders.Envelope\"}",
    30. "deleted": false
    31. }
    32. {
    33. "subject": "mysql-avro.mydb.products-key",
    34. "version": 1,
    35. "id": 5,
    36. "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysql_avro.mydb.products\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"mysql_avro.mydb.products.Key\"}",
    37. "deleted": false
    38. }
    39. {
    40. "subject": "mysql-avro.mydb.products-value",
    41. "version": 1,
    42. "id": 6,
    43. "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"mysql_avro.mydb.products\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.products.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"mysql_avro.mydb.products.Envelope\"}",
    44. "deleted": false
    45. }

    清除环境

  • 删除 connector

    1. curl -X DELETE http://localhost:8083/connectors/dbz-mysql-avro-connector
  • 停止 Kafka Connect

    1. sudo systemctl stop confluent-schema-registry
    2. sudo systemctl stop confluent-kafka-connect
  • 删除 topic ```bash kafka-topics —bootstrap-server localhost:9092 —delete —topic dbhistory.mysql-avro kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro.mydb.orders kafka-topics —bootstrap-server localhost:9092 —delete —topic mysql-avro.mydb.products kafka-topics —bootstrap-server localhost:9092 —delete —topic _schemas

kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-configs kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-offsets kafka-topics —bootstrap-server localhost:9092 —delete —topic connect-status

kafka-topics —bootstrap-server localhost:9092 —list

  1. <a name="RgOY0"></a>
  2. # 使用 Flink SQL
  3. <a name="aKv0u"></a>
  4. ## 安装并启动 Flink
  5. - 下载 [Flink 1.13.6](https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz) (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.13.6
  6. ```bash
  7. wget https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
  8. tar -xvf flink-1.13.6-bin-scala_2.12.tgz
  • 下载下面列出的依赖包,并将它们放到目录 flink-1.13.6/lib/ 下 ```json cd flink-1.13.6/lib/

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.13.6/flink-sql-avro-confluent-registry-1.13.6.jar

  1. - [flink-sql-connector-kafka_2.12-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar)
  2. - [flink-sql-avro-confluent-registry-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.13.6/flink-sql-avro-confluent-registry-1.13.6.jar)
  3. - 由于schema-registry 已经使用了 8081 ,所以需要修改 Flink 配置,将 rest 端口改为 18081
  4. ```bash
  5. vim ~/flink-1.13.6/conf/flink-conf.yaml
  6. rest.port: 18081
  • 启动 Flink 集群 ```json cd ~/flink-1.13.6

bin/start-cluster.sh

  1. <a name="XPA3w"></a>
  2. ## 在 Flink SQL CLI 中使用 Flink DDL 创建 Source 表
  3. - 启动 sql-client
  4. ```json
  5. bin/sql-client.sh
  • Flink SQL 创建 Source 表 ```sql CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql-avro.mydb.products’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘debezium-avro-confluent.schema-registry.url’ = ‘http://localhost:8081‘, ‘format’ = ‘debezium-avro-confluent’ );

CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), — debezium MySQL 插件会将 DATETIME 转换成 long customer_name STRING, price DOUBLE, — debezium MySQL 插件会将 DECIMAL(10, 5) 转换成 double product_id INT, order_status INT, — debezium MySQL 插件会将 BOOLEAN 转换成 int PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql-avro.mydb.orders’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘debezium-avro-confluent.schema-registry.url’ = ‘http://localhost:8081‘, ‘format’ = ‘debezium-avro-confluent’ );

  1. :::info
  2. 使用 Avro 编码消息,需要如下配置
  3. 'format' = 'debezium-avro-confluent'<br />'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8081',
  4. :::
  5. :::warning
  6. Avro DATETIME 转换成 long<br />Avro DECIMAL(10, 5) 转换成 double<br />Avro BOOLEAN 转换成 int
  7. :::
  8. - Flink SQL 提交任务
  9. ```sql
  10. select * from products;
  1. select * from orders;

数据类型映射

目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema

Flink SQL 类型 Avro 类型 Avro 逻辑类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY bytes
DECIMAL fixed decimal
TINYINT int
SMALLINT int
INT int
BIGINT long
FLOAT float
DOUBLE double
DATE int date
TIME int time-millis
TIMESTAMP long timestamp-millis
ARRAY array
MAP
(key 必须是 string/char/varchar 类型)
map
MULTISET
(元素必须是 string/char/varchar 类型)
map
ROW record

遇到的问题

ERROR The retention policy of the schema topic _schemas is incorrect

  • 问题现象

    1. [2018-01-11 14:37:49,682] INFO Validating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore)
    2. [2018-01-11 14:37:49,773] ERROR The retention policy of the schema topic _schemas is incorrect. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your schemas after a week. Refer to Kafka documentation for more details on cleanup policies (io.confluent.kafka.schemaregistry.storage.KafkaStore)
    3. [2018-01-11 14:37:49,780] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
    4. [2018-01-11 14:37:49,783] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)
    5. java.lang.NullPointerException
    6. at io.confluent.kafka.schemaregistry.storage.KafkaStore.close(KafkaStore.java:366)
    7. at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.close(KafkaSchemaRegistry.java:720)
    8. at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.onShutdown(SchemaRegistryRestApplication.java:111)
  • 解决方案

在重新启动 kafka 集群或 schema-registry 经常重启后有时会发生。执行下面命令手动修复

  1. kafka-topics --alter --config cleanup.policy=compact --topic _schemas --bootstrap-server

参考文档 Manual Install using Systemd on Ubuntu and Debian ERROR The retention policy of the schema topic _schemas is incorrect


[

](https://blog.csdn.net/OldDirverHelpMe/article/details/107881170)