安装并启动 MySQL、PostgreSQL

  • 安装并启动 MySQL(Server version: 8.0.28-0ubuntu0.20.04.3 )
    1. sudo apt update
    2. sudo apt install mysql-server
    3. sudo systemctl enable mysql
    4. sudo systemctl status mysql

    参考 如何在 Ubuntu 20.04 上安装 MySQL

:::warning 注意⚠️:需要开启 binlog 以及 binlog_format 必须是 row 格式,默认配置即可,不需要做额外操作

log-bin=/var/lib/mysql/mysql-bin.log # 指定 binlog 日志存储位置
binlog_format=ROW # 这里一定是 row 格式
expire-logs-days = 14 # 日志保留时间
max-binlog-size = 500M # 日志滚动大小 :::

  • 安装并启动 PostgreSQL( PostgreSQL 12.9-0ubuntu0.20.04.1)
    1. sudo apt update
    2. sudo apt install postgresql postgresql-contrib
    3. sudo systemctl enable postgresql
    4. sudo systemctl status postgresql

    参考 如何在 Ubuntu 20.04 上安装 PostgreSQL

安装并启动 Elasticsearch 和 Kibana

  • 安装并启动 Elasticsearch 和 Kibana
    1. sudo apt update
    2. sudo apt install apt-transport-https ca-certificates
    3. wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
    4. sudo sh -c 'echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" > /etc/apt/sources.list.d/elastic-7.x.list'
    5. sudo apt update
    6. sudo apt install elasticsearch
    7. sudo apt-get install kibana
    8. sudo systemctl enable --now elasticsearch.service
    9. sudo systemctl enable --now kibana.service

(可选)配置 Elasticsearch 远程访问

  • 修改 elasticsearch.yml 配置文件

    1. cat <<EOF >> /etc/elasticsearch/elasticsearch.yml
    2. network.host: 0.0.0.0
    3. discovery.seed_hosts: ["127.0.0.1"] # 必须配置,否则报错
    4. EOF
  • 重启 Elasticsearch 服务,使得应用生效

    1. sudo systemctl daemon-reload
    2. sudo systemctl restart elasticsearch

(可选)配置 Kibana 远程访问

tar xfz debezium-connector-mysql-1.8.1.Final-plugin.tar.gz && rm debezium-connector-mysql-1.8.1.Final-plugin.tar.gz

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.8.1.Final/debezium-connector-postgres-1.8.1.Final-plugin.tar.gz

tar xfz debezium-connector-postgres-1.8.1.Final-plugin.tar.gz && rm debezium-connector-postgres-1.8.1.Final-plugin.tar.gz

  1. - 修改 kafka connect 配置文件,配置插件位置,这里使用集群模式
  2. ```bash
  3. cat <<EOF >> /usr/local/kafka/config/connect-distributed.properties
  4. plugin.path=/usr/local/kafka/kafka-connectors
  5. EOF

:::info Connect 也支持单机模式
单机模式与集群模式类似,只是配置文件为 connect-standalone.properties,在启动时使用bin/connect-standalone.sh,启动的进程是 ConnectStandalone
单机模式使用起来更简单,特别是在开发和诊断问题的时候,或者是在需要让连接器和任务运行在某台特定机器上的时候 :::

:::info Connect 有以下几个重要的配置参数

  • bootstrap.servers:该参数列出了将要与Connect协同工作的broker服务器,连接器将会向这些broker写入数据或者从它们那里读取数据。你不需要指定集群所有的broker,不过建议至少指定3个
  • group.id:具有相同 group id 的 worker 属于同一个Connect集群。集群的连接器和它们的任务可以运行在任意一个worker上。
  • key.converter和value.converter:Connect可以处理存储在Kafka里的不同格式的数据。这两个参数分别指定了消息的键和值所使用的转换器。默认使用Kafka提供的JSONConverter,当然也可以配置成Confluent Schema Registry提供的AvroConverter
  • 有些转换器还包含了特定的配置参数。例如,通过将key.converter.schema.enable设置成true或者false来指定JSON消息是否可以包含schema。值转换器也有类似的配置,不过它的参数名是value.converter.schema.enable。Avro消息也包含了schema,不过需要通过key.converter.schema.registry.url和value.converter.schema.registry.url来 指 定Schema Registry的位置。 :::
  • (可选)可以将 key.converter.schema.enable 和 value.converter.schema.enable 设置成 false,这样 JSON 消息不会包含 schema,减少冗余存储

    1. key.converter.schemas.enable=false
    2. value.converter.schemas.enable=false
  • 启动连接器 ```bash cd /usr/local/kafka

bin/connect-distributed.sh -daemon config/connect-distributed.properties

  1. :::info
  2. 可以将 /usr/local/kafka 拷贝其他机器上,在其他机器上也启动连接器,这些机器组成一个集群
  3. :::
  4. - 查看日志
  5. ```bash
  6. tail -f logs/connectDistributed.out
  • 查看是否正常启动,ConnectDistributed 就是启动的进程

    1. jps
    2. 8147 Jps
    3. 596 Kafka
    4. 680 QuorumPeerMain
    5. 8012 ConnectDistributed
  • 为了更方便查看 json 数据,我们需要安装 jq

    1. apt install jq
  • 通过 REST API 查看是否正常启动,返回当前 Connect 的版本号 ```bash curl -H “Accept:application/json” localhost:8083 | jq { “version”: “2.8.1”, “commit”: “839b886f9b732b15”, “kafka_cluster_id”: “-41s96u9Rz6XECIr-Pruzw” }

  1. - 查看 connectors
  2. ```bash
  3. curl -H "Accept:application/json" localhost:8083/connectors | jq
  4. []
  • 查看已经安装好的连接器插件
    1. curl -H "Accept:application/json" localhost:8083/connector-plugins | jq
    2. [
    3. {
    4. "class": "io.debezium.connector.mysql.MySqlConnector",
    5. "type": "source",
    6. "version": "1.8.1.Final"
    7. },
    8. {
    9. "class": "io.debezium.connector.postgresql.PostgresConnector",
    10. "type": "source",
    11. "version": "1.8.1.Final"
    12. },
    13. {
    14. "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    15. "type": "sink",
    16. "version": "2.8.1"
    17. },
    18. {
    19. "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    20. "type": "source",
    21. "version": "2.8.1"
    22. },
    23. {
    24. "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    25. "type": "source",
    26. "version": "1"
    27. },
    28. {
    29. "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    30. "type": "source",
    31. "version": "1"
    32. },
    33. {
    34. "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    35. "type": "source",
    36. "version": "1"
    37. }
    38. ]
    可以看到 debezium 的 MySQL和 Postgres 的连接器已经安装好

    使用 Debezium 同步 MySQL 数据到 Kafka

    :::info MySQL: 商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders :::

在 MySQL 数据库中准备数据

  • 使用 root 用户登录 MySQL

    1. mysql
  • 创建数据库和表 products,orders,并插入数据 ```json — MySQL CREATE DATABASE mydb;

USE mydb;

CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) );

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products VALUES (default,”scooter”,”Small 2-wheel scooter”), (default,”car battery”,”12V car battery”), (default,”12-pack drill bits”,”12-pack of drill bits with sizes ranging from #40 to #3”), (default,”hammer”,”12oz carpenter’s hammer”), (default,”hammer”,”14oz carpenter’s hammer”), (default,”hammer”,”16oz carpenter’s hammer”), (default,”rocks”,”box of assorted rocks”), (default,”jacket”,”water resistent black wind breaker”), (default,”spare tire”,”24 inch spare tire”);

CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL — Whether order has been placed ) AUTO_INCREMENT = 10001;

INSERT INTO orders VALUES (default, ‘2020-07-30 10:08:22’, ‘Jark’, 50.50, 102, false), (default, ‘2020-07-30 10:11:09’, ‘Sally’, 15.00, 105, false), (default, ‘2020-07-30 12:00:30’, ‘Edward’, 25.25, 106, false);

  1. <a name="UtZQo"></a>
  2. ## 创建 Debezium 用于同步数据的用户并授权
  3. - 使用 root 用户登录 MySQL
  4. ```json
  5. mysql
  • 创建 Debezium 用于同步数据的用户并授权 ```bash CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘123456’;

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘debezium’;

FLUSH PRIVILEGES;

  1. <a name="B82G8"></a>
  2. ## 配置 MySQL 连接信息(Distributed 模式)
  3. - 切换到 kafka 安装目录
  4. ```json
  5. cd /usr/local/kafka
  • 将 MySQL 连接信息保存到 dbz-mysql-connector.json 文件中
    1. cat <<EOF > kafka-connectors/dbz-mysql-connector.json
    2. {
    3. "name":"dbz-mysql-connector",
    4. "config":{
    5. "connector.class":"io.debezium.connector.mysql.MySqlConnector",
    6. "tasks.max":"1",
    7. "database.hostname":"localhost",
    8. "database.port":"3306",
    9. "database.user":"debezium",
    10. "database.password":"123456",
    11. "database.server.id":"184054",
    12. "database.server.name":"mysql",
    13. "database.include.list":"mydb",
    14. "database.history.kafka.bootstrap.servers":"localhost:9092",
    15. "database.history.kafka.topic":"dbhistory.mysql",
    16. "database.allowPublicKeyRetrieval":"true",
    17. "decimal.handling.mode":"double",
    18. "tombstones.on.delete":"false"
    19. }
    20. }
    21. EOF

:::info

  • name:标识连接器的名称
  • connector.class:对应数据库类
  • tasks.max:默认1
  • database.hostname:数据库 ip
  • database.port:数据库端口
  • database.user:数据库登录名
  • database.password:数据库密码
  • database.server.id:数据库 id,标识当前库,不重复就行
  • database.server.name:给数据库取别名
  • database.include.list:类似白名单,里面的库可以监控到,不在里面监控不到,多库逗号分隔,支持正则匹配
  • database.history.kafka.bootstrap.servers:保存表 DDL 相关信息的 kafka 地址
  • database.history.kafka.topic:表 DDL 相关信息会保存在这个 topic 里面
  • decimal.handling.mode:当处理 decimal和 Int 类型时,默认是二进制显示,我们改为 double 显示
  • snapshot.mode:快照模式,这个需要具体情况,具体分析,因为我只需要实时数据,不需要历史数据,所以设置为 schema_only
  • tombstones.on.delete:默认是 True,当我们删除记录的时候,会产生两天数据,第二条为NULL,但是我们不希望出现NULL,所以设置为 False
  • table.include.list:类似白名单,里面的表可以监控到,不在里面监控不到,多表逗号分隔,支持正则匹配 :::

:::danger MySQL v8 报错 Unable to connect: Public Key Retrieval is not allowed,需要添加配置项
“database.allowPublicKeyRetrieval”:”true”,
https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/ :::

  • 启动连接器

    1. bin/connect-distributed.sh -daemon config/connect-distributed.properties
  • 配置 MySQL 连接信息

    1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @kafka-connectors/dbz-mysql-connector.json
  • 查看 Kafka 的 Topic 列表

    1. bin/kafka-topics.sh --zookeeper localhost:2181 --list
    2. __consumer_offsets
    3. connect-configs
    4. connect-offsets
    5. connect-status
    6. dbhistory.mysql
    7. mysql
    8. mysql.mydb.orders
    9. mysql.mydb.products

    Kafka 主题名称使用 .. 这样的格式。

  • 查看 Kafka 的 Topic 信息

    1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mysql.mydb.orders | jq
    2. {
    3. "before": null,
    4. "after": {
    5. "order_id": 10001,
    6. "order_date": 1596103702000,
    7. "customer_name": "Jark",
    8. "price": "50.50000",
    9. "product_id": 102,
    10. "order_status": 0
    11. },
    12. "source": {
    13. "version": "1.8.1.Final",
    14. "connector": "mysql",
    15. "name": "mysql",
    16. "ts_ms": 1648193001632,
    17. "snapshot": "true",
    18. "db": "mydb",
    19. "sequence": null,
    20. "table": "orders",
    21. "server_id": 0,
    22. "gtid": null,
    23. "file": "binlog.000005",
    24. "pos": 3100,
    25. "row": 0,
    26. "thread": null,
    27. "query": null
    28. },
    29. "op": "r",
    30. "ts_ms": 1648193001637,
    31. "transaction": null
    32. }
    33. {
    34. "before": null,
    35. "after": {
    36. "order_id": 10002,
    37. "order_date": 1596103869000,
    38. "customer_name": "Sally",
    39. "price": "15.00000",
    40. "product_id": 105,
    41. "order_status": 0
    42. },
    43. "source": {
    44. "version": "1.8.1.Final",
    45. "connector": "mysql",
    46. "name": "mysql",
    47. "ts_ms": 1648193001641,
    48. "snapshot": "true",
    49. "db": "mydb",
    50. "sequence": null,
    51. "table": "orders",
    52. "server_id": 0,
    53. "gtid": null,
    54. "file": "binlog.000005",
    55. "pos": 3100,
    56. "row": 0,
    57. "thread": null,
    58. "query": null
    59. },
    60. "op": "r",
    61. "ts_ms": 1648193001641,
    62. "transaction": null
    63. }
    64. {
    65. "before": null,
    66. "after": {
    67. "order_id": 10003,
    68. "order_date": 1596110430000,
    69. "customer_name": "Edward",
    70. "price": "25.25000",
    71. "product_id": 106,
    72. "order_status": 0
    73. },
    74. "source": {
    75. "version": "1.8.1.Final",
    76. "connector": "mysql",
    77. "name": "mysql",
    78. "ts_ms": 1648193001641,
    79. "snapshot": "true",
    80. "db": "mydb",
    81. "sequence": null,
    82. "table": "orders",
    83. "server_id": 0,
    84. "gtid": null,
    85. "file": "binlog.000005",
    86. "pos": 3100,
    87. "row": 0,
    88. "thread": null,
    89. "query": null
    90. },
    91. "op": "r",
    92. "ts_ms": 1648193001642,
    93. "transaction": null
    94. }

    清除环境

  • 删除 connector

    1. curl -X DELETE http://localhost:8083/connectors/dbz-mysql-connector
  • 停止 Kafka Connect

    1. jps | grep ConnectDistributed | awk '{print $1}' | xargs kill -9
  • 删除 topic ```json bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic mysql bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic dbhistory.mysql bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic mysql.mydb.products bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic mysql.mydb.orders

bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-configs bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-offsets bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-status

bin/kafka-topics.sh —zookeeper localhost:2181 —list

  1. :::info
  2. 注意⚠️:Standalone 模式在启动 Kafka Connect 的时候通过 connector.properties 进行配置。
  3. - MySQL 连接信息保存到 dbz-mysql-connector.properties 文件中
  4. :::
  5. ```bash
  6. cat <<EOF > kafka-connectors/dbz-mysql-connector.properties
  7. name=dbz-mysql-connector
  8. connector.class=io.debezium.connector.mysql.MySqlConnector
  9. tasks.max=1
  10. database.hostname=localhost
  11. database.port=3306
  12. database.user=debezium
  13. database.password=123456
  14. database.server.id=184054
  15. database.server.name=mysql
  16. database.include.list=mydb
  17. database.history.kafka.bootstrap.servers=localhost:9092
  18. database.history.kafka.topic=dbhistory.mysql
  19. database.allowPublicKeyRetrieval=true
  20. decimal.handling.mode=double
  21. tombstones.on.delete=false
  22. EOF

:::info

  • 启动连接器 :::

    1. /usr/local/kafka/bin/connect-standalone.sh config/connect-standalone.properties kafka-connectors/dbz-mysql-connector.properties

    使用 Debezium 同步 Postgres 数据到 Kafka

    :::info Postgres: 物流表 shipments 将存储在该数据库中 :::

    配置 Postgres 数据库

  • 修改 postgresql.conf ```json vim /etc/postgresql/12/main/postgresql.conf

更改wal日志方式为 logical

wal_level = logical # minimal, replica, or logical

中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)

wal_sender_timeout = 180s # in milliseconds; 0 disable

  1. - 修改 pg_hba.conf
  2. ```json
  3. vim /etc/postgresql/12/main/pg_hba.conf
  4. # TYPE DATABASE USER ADDRESS METHOD
  5. ############ REPLICATION ##############
  6. local replication all trust
  7. host replication all 127.0.0.1/32 trust
  8. host replication all ::1/128 trust

:::info

  • METHOD 这里指定的是 trust,无条件地允许连接。这种方法允许任何可以与 PostgreSQL 数据库服务器连接的用户以他们期望的任意 PostgreSQL 数据库用户身份登入,而不需要口令或者其他任何认证。
  • USER 这里指定的是 all,如果已创建具有 REPLICATION 和 LOGIN 权限的其他用户,可以更改为其他用户。
  • ADDRESS 这里指定的是 127.0.0.1/32(IPv4 回环地址) 和 ::1/128(IPv6 回环地址),可以更改为 0.0.0.0/0 表示所有 IPv4 地址,::0/0表示所有 IPv6 地址 :::

hba 配置参考文章 http://www.postgres.cn/docs/9.6/auth-pg-hba-conf.html

  • 重启 postgresql 服务生效,所以一般是在业务低峰期更改

    1. sudo systemctl daemon-reload
    2. sudo systemctl restart postgresql

    (可选)使用 wal2json 插件

    使用 wal2json 插件

    使用wal2json插件(需要安装),也可以使用 pgsql 自带的 pgoutput (不需要安装)。

  • 安装 wal2json 插件

    1. sudo apt-get install postgresql-12-wal2json
  • 修改 postgresql.conf

    1. # 使用 wal2json 插件
    2. shared_preload_libraries = 'wal2json'

    测试 wal2json 插件

  • 创建名为 test 的数据库和名为 test_table 的表 ```json CREATE DATABASE test;

\c test

CREATE TABLE test_table ( id char(10) NOT NULL, code char(10), PRIMARY KEY (id) );

  1. - test 的数据库创建名为 test_slot 的槽,使用 wal2json
  2. ```json
  3. sudo su - postgres
  4. pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
  • 开始从 test 数据库的逻辑复制槽 test_slot 流式传输更改

    1. pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
  • 在 test_table 执行一些基本的 DML 操作以触发 INSERT/UPDATE/DELETE 更改事件 ```json INSERT INTO test_table (id, code) VALUES(‘id1’, ‘code1’);

update test_table set code=’code2’ where id=’id1’;

delete from test_table where id=’id1’;

  1. :::warning
  2. 注意⚠️:表 test_table REPLICA IDENTITY 设置为 DEFAULT
  3. :::
  4. 在发生 INSERTUPDATE DELETE 事件时,wal2json 插件输出由 pg_recvlogical 捕获的表更改。
  5. - INSERT 事件的输出
  6. ```json
  7. {
  8. "change": [
  9. {
  10. "kind": "insert",
  11. "schema": "public",
  12. "table": "test_table",
  13. "columnnames": ["id", "code"],
  14. "columntypes": ["character(10)", "character(10)"],
  15. "columnvalues": ["id1 ", "code1 "]
  16. }
  17. ]
  18. }
  • UPDATE 事件的输出

    1. {
    2. "change": [
    3. {
    4. "kind": "update",
    5. "schema": "public",
    6. "table": "test_table",
    7. "columnnames": ["id", "code"],
    8. "columntypes": ["character(10)", "character(10)"],
    9. "columnvalues": ["id1 ", "code2 "],
    10. "oldkeys": {
    11. "keynames": ["id"],
    12. "keytypes": ["character(10)"],
    13. "keyvalues": ["id1 "]
    14. }
    15. }
    16. ]
    17. }
  • DELETE 事件的输出

    1. {
    2. "change": [
    3. {
    4. "kind": "delete",
    5. "schema": "public",
    6. "table": "test_table",
    7. "oldkeys": {
    8. "keynames": ["id"],
    9. "keytypes": ["character(10)"],
    10. "keyvalues": ["id1 "]
    11. }
    12. }
    13. ]
    14. }
  • 测试完成后,可以通过以下命令删除 test 数据库的槽 test_slot

    1. pg_recvlogical -d test --slot test_slot --drop-slot
  • 删除 test 数据库 ```json \c postgres

drop database test;

  1. :::info
  2. REPLICA IDENTITY,是一个 PostgreSQL 特定的表级设置,它确定在 UPDATE DELETE 事件的情况下可用于逻辑解码的信息量。
  3. REPLICA IDENTITY 4 个可能的值:
  4. - DEFAULT - UPDATE DELETE 事件将仅包含表的主键列的旧值
  5. - NOTHING - UPDATE DELETE 事件将不包含有关任何表列上旧值的任何信息
  6. - FULL - UPDATE DELETE 事件将包含表中所有列的旧值
  7. - INDEX 索引名称 - UPDATE DELETE 事件将包含名为 index name 的索引定义中包含的列的旧值
  8. 可以使用以下命令修改和检查表的副本 REPLICA IDENTITY
  9. :::
  10. ```json
  11. ALTER TABLE test_table REPLICA IDENTITY FULL;
  12. test=# \d+ test_table
  13. Table "public.test_table"
  14. Column | Type | Modifiers | Storage | Stats target | Description
  15. -------+---------------+-----------+----------+--------------+------------
  16. id | character(10) | not null | extended | |
  17. code | character(10) | | extended | |
  18. Indexes:
  19. "test_table_pkey" PRIMARY KEY, btree (id)
  20. Replica Identity: FULL

:::info 这是 wal2json 插件在 UPDATE 事件和 REPLICA IDENTITY 设置为 FULL 时的输出。 与 REPLICA IDENTITY 设置为 DEFAULT 时的相应输出进行比较。

  • UPDATE 事件的输出 :::

    1. {
    2. "change": [
    3. {
    4. "kind": "update",
    5. "schema": "public",
    6. "table": "test_table",
    7. "columnnames": ["id", "code"],
    8. "columntypes": ["character(10)", "character(10)"],
    9. "columnvalues": ["id1 ", "code2 "],
    10. "oldkeys": {
    11. "keynames": ["id", "code"],
    12. "keytypes": ["character(10)", "character(10)"],
    13. "keyvalues": ["id1 ", "code1 "]
    14. }
    15. }
    16. ]
    17. }

    在 Postgres 数据库中准备数据

  • 使用 postgres 用户登录 PostgreSQL

    1. sudo -u postgres psql
  • 创建数据库和创建表 shipments,并插入数据 ```json — PG CREATE DATABASE mydb;

    \c mydb

CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL );

ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;

— 更改复制标识包含更新和删除之前值 ALTER TABLE public.shipments REPLICA IDENTITY FULL;

INSERT INTO shipments VALUES (default,10001,’Beijing’,’Shanghai’,false), (default,10002,’Hangzhou’,’Shanghai’,false), (default,10003,’Shanghai’,’Hangzhou’,false);

  1. <a name="fEW8Z"></a>
  2. ## 创建 Debezium 用于同步数据的用户并授权
  3. - 使用 postgres 用户登录 PostgreSQL
  4. ```json
  5. sudo -u postgres psql
  • 创建 Debezium 用于同步数据的用户并授权

使用 pgoutput

  1. -- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
  2. CREATE USER debezium WITH PASSWORD '123456' REPLICATION LOGIN;
  3. -- 创建一个 replication_group 用于共享所有权
  4. CREATE ROLE replication_group;
  5. -- 将现有表所有者和复制用户添加到该组
  6. GRANT replication_group TO postgres;
  7. GRANT replication_group TO debezium;
  8. -- 将表的所有权转交给 replication_group,这样 debezium postgres都用表的所有权
  9. ALTER TABLE public.shipments OWNER TO replication_group;
  10. -- mydb 数据库创建新的模式和发布的权限赋给 debezium
  11. GRANT CREATE ON DATABASE mydb TO debezium;

:::info 如果使用 pgoutput 需要 REPLICATION,LOGIN,CREATE,SELECT 权限,并且必须是表的所有者,表所有者自动拥有表的 SELECT 权限,所以只需要 CREATE 权限。 :::

(可选)使用 wal2json

  1. -- 创建 debezium 用户并赋予 REPLICATION LOGIN 权限
  2. CREATE USER debezium WITH PASSWORD '123456' REPLICATION LOGIN;
  3. -- shipments 表查询权限赋给用户
  4. GRANT SELECT ON public.shipments TO debezium;

:::info 如果使用 wal2json 插件需要 REPLICATION,LOGIN,SELECT 权限,也可以使用下面语句把 schema public 下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium; :::

PostgreSQL 权限配置参考文章 PostgreSQL 角色管理 PostgreSQL 权限

配置 PostgresSQL 连接信息(Distributed 模式)

  • 切换到 kafka 安装目录

    1. cd /usr/local/kafka
  • 将 PostgresSQL 连接信息保存到 dbz-postgresql-connector.json 文件中

使用 pgoutput

  1. cat <<EOF > kafka-connectors/dbz-pg-connector.json
  2. {
  3. "name":"dbz-pg-connector",
  4. "config": {
  5. "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  6. "tasks.max": "1",
  7. "database.hostname": "localhost",
  8. "database.port": "5432",
  9. "database.user": "debezium",
  10. "database.password": "123456",
  11. "database.dbname" : "mydb",
  12. "database.server.id":"121041",
  13. "database.server.name": "pg",
  14. "decimal.handling.mode":"double",
  15. "tombstones.on.delete":"false",
  16. "heartbeat.interval.ms":"100",
  17. "publication.autocreate.mode":"filtered",
  18. "plugin.name":"pgoutput"
  19. }
  20. }
  21. EOF

:::info 在使用 pgoutput 插件时,建议将过滤publication.autocreate.mode 配置为 filtered。 如果使用 all_tables(publication.autocreate.mode 的默认值)并且未找到该发布,则连接器会尝试使用 CREATE PUBLICATION <publication_name> FOR ALL TABLES; 创建一个,但由于缺少权限而失败。 :::

(可选)使用 wal2json

  1. cat <<EOF > kafka-connectors/dbz-pg-connector.json
  2. {
  3. "name":"dbz-pg-connector",
  4. "config": {
  5. "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  6. "tasks.max": "1",
  7. "database.hostname": "localhost",
  8. "database.port": "5432",
  9. "database.user": "debezium",
  10. "database.password": "123456",
  11. "database.dbname" : "mydb",
  12. "database.server.id":"121041",
  13. "database.server.name": "pg",
  14. "decimal.handling.mode":"String",
  15. "tombstones.on.delete":"false",
  16. "heartbeat.interval.ms":"100",
  17. "plugin.name":"wal2json"
  18. }
  19. }
  20. EOF

:::info

  • name:标识连接器的名称
  • connector.class:对应数据库类
  • database.hostname:数据库ip
  • database.port:数据库端口
  • database.user:数据库登录名
  • database.password:数据库密码
  • database.dbname:数据库名称
  • database.server.name:给数据库取别名
  • schema.include.list:类似白名单,里面的模式可以监控到,不在里面监控不到,多模式逗号分隔,支持正则匹配
  • slot.name:slot的名称
  • snapshot.mode:快照模式,这个需要具体情况,具体分析,因为我只需要实时数据,不需要历史数据,所以设置为never
  • table.include.list:类似白名单,里面的表可以监控到,不在里面监控不到,多表逗号分隔,支持正则匹配
  • publication.autocreate.mode:发布表处理策略
    • disabled - 连接器不会尝试创建 Publicatin
    • all_tables - 如果 pg 中不存在该 Publication,则自动创建一个包含所有表的 Publicatin
    • filtered - 与 all_talbes 不同的是自动创建只包含 table.include.list 的 Publication
  • decimal.handling.mode:当处理 decimal 和 Int 类型时,默认是二进制显示,我们改为字符串显示
  • heartbeat.interval.ms:控制连接器向 Kafka 主题发送心跳消息的频率。默认行为是连接器不发送心跳消息(毫秒)
  • tombstones.on.delete:默认是 True,当我们删除记录的时候,会产生两条数据,第二条为 NULL,但是我们不希望出现 NULL,所以设置为 False
  • plugin.name:使用wal2json插件(需要安装),也可以使用pgsql 自带的 pgoutput (不需要安装) :::

  • 启动连接器

    1. bin/connect-distributed.sh -daemon config/connect-distributed.properties
  • 配置 PostgresSQL 连接信息

    1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @kafka-connectors/dbz-pg-connector.json
  • 查看 Kafka 的 Topic 列表

    1. bin/kafka-topics.sh --zookeeper localhost:2181 --list
    2. __consumer_offsets
    3. __debezium-heartbeat.pg
    4. connect-configs
    5. connect-offsets
    6. connect-status
    7. pg.public.shipments

    Kafka 主题名称使用 serverName.schemaName.tableName 这样的格式。

  • Kafka 的 Topic 信息

    1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic pg.public.shipments | jq
    2. {
    3. "before": null,
    4. "after": {
    5. "shipment_id": 1001,
    6. "order_id": 10001,
    7. "origin": "Beijing",
    8. "destination": "Shanghai",
    9. "is_arrived": false
    10. },
    11. "source": {
    12. "version": "1.8.1.Final",
    13. "connector": "postgresql",
    14. "name": "pg",
    15. "ts_ms": 1648199166117,
    16. "snapshot": "true",
    17. "db": "mydb",
    18. "sequence": "[null,\"23741376\"]",
    19. "schema": "public",
    20. "table": "shipments",
    21. "txId": 511,
    22. "lsn": 23741376,
    23. "xmin": null
    24. },
    25. "op": "r",
    26. "ts_ms": 1648199166120,
    27. "transaction": null
    28. }
    29. {
    30. "before": null,
    31. "after": {
    32. "shipment_id": 1002,
    33. "order_id": 10002,
    34. "origin": "Hangzhou",
    35. "destination": "Shanghai",
    36. "is_arrived": false
    37. },
    38. "source": {
    39. "version": "1.8.1.Final",
    40. "connector": "postgresql",
    41. "name": "pg",
    42. "ts_ms": 1648199166123,
    43. "snapshot": "true",
    44. "db": "mydb",
    45. "sequence": "[null,\"23741376\"]",
    46. "schema": "public",
    47. "table": "shipments",
    48. "txId": 511,
    49. "lsn": 23741376,
    50. "xmin": null
    51. },
    52. "op": "r",
    53. "ts_ms": 1648199166123,
    54. "transaction": null
    55. }
    56. {
    57. "before": null,
    58. "after": {
    59. "shipment_id": 1003,
    60. "order_id": 10003,
    61. "origin": "Shanghai",
    62. "destination": "Hangzhou",
    63. "is_arrived": false
    64. },
    65. "source": {
    66. "version": "1.8.1.Final",
    67. "connector": "postgresql",
    68. "name": "pg",
    69. "ts_ms": 1648199166124,
    70. "snapshot": "last",
    71. "db": "mydb",
    72. "sequence": "[null,\"23741376\"]",
    73. "schema": "public",
    74. "table": "shipments",
    75. "txId": 511,
    76. "lsn": 23741376,
    77. "xmin": null
    78. },
    79. "op": "r",
    80. "ts_ms": 1648199166124,
    81. "transaction": null
    82. }

    清除环境

  • 删除 connector

    1. curl -X DELETE http://localhost:8083/connectors/dbz-pg-connector
  • 停止 Kafka Connect

    1. jps | grep ConnectDistributed | awk '{print $1}' | xargs kill -9
  • 删除 topic ```json bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic pg.public.shipments bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic __debezium-heartbeat.pg

bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-configs bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-offsets bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic connect-status

bin/kafka-topics.sh —zookeeper localhost:2181 —list

  1. <a name="PqeWn"></a>
  2. # 使用 Flink SQL 进行 ETL
  3. :::info
  4. Elasticsearch: 最终的订单表 enriched_orders 将写到 Elasticsearch
  5. :::
  6. <a name="aKv0u"></a>
  7. ## 安装并启动 Flink
  8. - 下载 [Flink 1.13.6](https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz) (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.13.6
  9. ```json
  10. wget https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
  11. tar -xvf flink-1.13.6-bin-scala_2.12.tgz
  • 下载下面列出的依赖包,并将它们放到目录 flink-1.13.6/lib/ 下 ```json cd flink-1.13.6/lib/

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar

wget https://jdbc.postgresql.org/download/postgresql-42.3.3.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.6/flink-sql-connector-elasticsearch7_2.12-1.13.6.jar

  1. - [flink-sql-connector-kafka_2.12-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar)
  2. - [flink-connector-jdbc_2.12-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar)
  3. - [42.3.3 JDBC 42 (postgresql.org)](https://jdbc.postgresql.org/download/postgresql-42.3.3.jar)
  4. - [flink-sql-connector-elasticsearch7_2.12-1.13.6.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.6/flink-sql-connector-elasticsearch7_2.12-1.13.6.jar)
  5. - 启动 flink 集群
  6. ```json
  7. cd ~/flink-1.13.6
  8. bin/start-cluster.sh

在 Flink SQL CLI 中使用 Flink DDL 创建 Source 表

  • 启动 sql-client

    1. bin/sql-client.sh
  • 开启 checkpoint,每隔 3 秒做一次 checkpoint

    1. SET 'execution.checkpointing.interval' = '3s';
  • Flink SQL 创建 Source 表 ```json CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql.mydb.products’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘format’ = ‘debezium-json’ );

CREATE TABLE orders ( order_id INT, order_date BIGINT, — debezium MySQL 插件会将 DATETIME 转换成 INT64 customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘mysql.mydb.orders’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘format’ = ‘debezium-json’ );

CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘pg.public.shipments’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘format’ = ‘debezium-json’ );

  1. :::info
  2. order_date 类型为 BIGINT 而不是 TIMESTAMP(0)<br />对于 json 格式,debezium MySQL 插件会将 DATETIME 转换成 INT64
  3. :::
  4. :::info
  5. 在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable',用来在消息体中包含 schema 信息
  6. 需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true'(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能
  7. :::
  8. <a name="GNCGh"></a>
  9. ## 关联订单数据并且将其写入 Elasticsearch 中
  10. - Flink SQL 创建 Sink
  11. ```json
  12. CREATE TABLE enriched_orders (
  13. order_id INT,
  14. order_date BIGINT, -- debezium MySQL 插件会将 DATETIME 转换成 INT64
  15. customer_name STRING,
  16. price DECIMAL(10, 5),
  17. product_id INT,
  18. order_status BOOLEAN,
  19. product_name STRING,
  20. product_description STRING,
  21. shipment_id INT,
  22. origin STRING,
  23. destination STRING,
  24. is_arrived BOOLEAN,
  25. PRIMARY KEY (order_id) NOT ENFORCED
  26. ) WITH (
  27. 'connector' = 'elasticsearch-7',
  28. 'hosts' = 'http://localhost:9200',
  29. 'index' = 'enriched_orders'
  30. );
  • 关联订单数据

    1. SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
    2. FROM orders AS o
    3. LEFT JOIN products AS p ON o.product_id = p.id
    4. LEFT JOIN shipments AS s ON o.order_id = s.order_id;
  • 关联订单数据并且将其写入 Elasticsearch 中 ```json

INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;

  1. <a name="FlaFo"></a>
  2. ### 在 Kibana 中看到包含商品和物流信息的订单数据
  3. - 访问 [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) 创建 index pattern enriched_orders
  4. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/1225286/1648457307824-5f9ee141-2d6d-484e-9aec-3369c7daf610.png#clientId=u413abce9-6ed0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=554&id=u92128858&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1108&originWidth=3444&originalType=binary&ratio=1&rotation=0&showTitle=false&size=275463&status=done&style=none&taskId=u31cd023c-c4b1-472a-969e-b4bd44cc416&title=&width=1722)
  5. - 在 [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) 看到写入的数据了![image.png](https://cdn.nlark.com/yuque/0/2022/png/1225286/1648457326749-0dab62fa-d1a1-49f3-9467-371d549a0877.png#clientId=u413abce9-6ed0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=548&id=uba3aa923&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1096&originWidth=3440&originalType=binary&ratio=1&rotation=0&showTitle=false&size=297933&status=done&style=none&taskId=uf1d7d6f1-00d9-4f90-bbde-42ad871f8d8&title=&width=1720)
  6. <a name="mFc82"></a>
  7. ### 修改 MySQL 和 Postgres 数据库中表的数据
  8. - 在 MySQL 的 orders 表中插入一条数据
  9. ```json
  10. --MySQL
  11. INSERT INTO orders
  12. VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
  • 在 Postgres 的 shipment 表中插入一条数据

    1. --PG
    2. INSERT INTO shipments
    3. VALUES (default,10004,'Shanghai','Beijing',false);
  • 在 MySQL 的 orders 表中更新订单的状态(未生效)

    1. --MySQL
    2. UPDATE orders SET order_status = true WHERE order_id = 10004;
  • 在 Postgres 的 shipment 表中更新物流的状态

    1. --PG
    2. UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
  • 在 MYSQL 的 orders 表中删除一条数据

    1. --MySQL
    2. DELETE FROM orders WHERE order_id = 10004;
  • 每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

image.png

关联订单数据并且将其写入 PostgreSQL

  • 使用 postgres 用户登录 PostgreSQL

    1. sudo -u postgres psql
  • 创建 flink 用于同步数据的用户并授权

    1. CREATE USER flink WITH PASSWORD '123456';
  • 使用 flink 用户登录 PostgreSQL

    1. psql -U flink -d mydb -h 127.0.0.1
  • 在 PostgreSQL 目标数据库中创建 enriched_orders 表

    1. CREATE TABLE enriched_orders (
    2. order_id INTEGER NOT NULL PRIMARY KEY,
    3. order_date BIGINT,
    4. customer_name TEXT,
    5. price DECIMAL(10, 5),
    6. product_id INTEGER,
    7. order_status BOOLEAN,
    8. product_name TEXT,
    9. product_description TEXT,
    10. shipment_id INTEGER,
    11. origin TEXT,
    12. destination TEXT,
    13. is_arrived BOOLEAN
    14. );

:::info 必须设置主键,这里设置 order_id 为主键 :::

  • Flink SQL 创建 Sink 表

    1. CREATE TABLE enriched_orders (
    2. order_id INT,
    3. order_date BIGINT, -- debezium MySQL 插件会将 DATETIME 转换成 INT64
    4. customer_name STRING,
    5. price DECIMAL(10, 5),
    6. product_id INT,
    7. order_status BOOLEAN,
    8. product_name STRING,
    9. product_description STRING,
    10. shipment_id INT,
    11. origin STRING,
    12. destination STRING,
    13. is_arrived BOOLEAN,
    14. PRIMARY KEY (order_id) NOT ENFORCED
    15. ) WITH (
    16. 'connector' = 'jdbc',
    17. 'url' = 'jdbc:postgresql://localhost:5432/mydb', -- 请替换为实际 PostgreSQL 连接参数
    18. 'table-name' = 'enriched_orders', -- 需要写入的数据表
    19. 'username' = 'flink', -- 数据库访问的用户名(需要提供 INSERT 权限)
    20. 'password' = '123456' -- 数据库访问的密码
    21. );
  • 关联订单数据并且将其写入 PostgreSQL 中 ```json

INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id; ```

:::info 备注:
order_date 类型为 BIGINT 而不是 TIMESTAMP(0)
对于 json 格式,debezium MySQL 插件会将 DATETIME 转换成 INT64

所以 Elasticsearch 保存的是 BIGINT 类型,Kibana 看到的应该显示为整数
这里直接使用 Flink CDC 的截图,所以看到的是 TIMESTAMP,你应该看到的是整数 :::

数据类型映射

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

Flink SQL 类型 JSON 类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object

参考文档 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL