基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

Flink CDC 配置过滤删除操作

在 Flink SQL CLI 中使用 Flink DDL 创建表时可以通过 debezium.skipped.operations 选项配置需要过滤的 oplog 操作。

除了使用 Flink DDL 创建表时需要额外配置 <font style="color:#F5222D;">debezium.skipped.operations</font>外,其他所有步骤保持不变

MySQL CDC 和 PostgreSQL CDC debezium.skipped.operations 选项略有不同

  • MySQL CDC

Debezium’s MySQL Connector properties

参数 说明 是否必填 默认值
debezium.skipped.operations 需要过滤的 oplog 操作,以逗号分隔。操作包括 c 表示插入,u 表示更新,d 表示删除。默认情况下,不跳过任何操作。 No default
  • PostgreSQL CDC

Debezium’s Postgres Connector properties

参数 说明 是否必填 默认值
debezium.skipped.operations 需要过滤的 oplog 操作,以逗号分隔。操作包括 c 表示插入,u 表示更新,d 表示删除,t 表示截断,none 表示不跳过任何操作。默认情况下,将跳过截断操作。 t

在 Flink SQL CLI 中使用 Flink DDL 创建表

使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
  1. -- Flink SQL
  2. Flink SQL> CREATE TABLE products (
  3. id INT,
  4. name STRING,
  5. description STRING,
  6. PRIMARY KEY (id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = 'localhost',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = '123456',
  13. 'database-name' = 'mydb',
  14. 'table-name' = 'products',
  15. 'debezium.skipped.operations' = 'd'
  16. );
  17. Flink SQL> CREATE TABLE orders (
  18. order_id INT,
  19. order_date TIMESTAMP(0),
  20. customer_name STRING,
  21. price DECIMAL(10, 5),
  22. product_id INT,
  23. order_status BOOLEAN,
  24. PRIMARY KEY (order_id) NOT ENFORCED
  25. ) WITH (
  26. 'connector' = 'mysql-cdc',
  27. 'hostname' = 'localhost',
  28. 'port' = '3306',
  29. 'username' = 'root',
  30. 'password' = '123456',
  31. 'database-name' = 'mydb',
  32. 'table-name' = 'orders',
  33. 'debezium.skipped.operations' = 'd'
  34. );
  35. Flink SQL> CREATE TABLE shipments (
  36. shipment_id INT,
  37. order_id INT,
  38. origin STRING,
  39. destination STRING,
  40. is_arrived BOOLEAN,
  41. PRIMARY KEY (shipment_id) NOT ENFORCED
  42. ) WITH (
  43. 'connector' = 'postgres-cdc',
  44. 'hostname' = 'localhost',
  45. 'port' = '5432',
  46. 'username' = 'postgres',
  47. 'password' = 'postgres',
  48. 'database-name' = 'postgres',
  49. 'schema-name' = 'public',
  50. 'table-name' = 'shipments',
  51. 'debezium.skipped.operations' = 'd'
  52. );

在 Kibana 中查看包含商品和物流信息的订单数据

在 MySQL 的 orders 表中插入一条数据
  1. --MySQL
  2. INSERT INTO orders
  3. VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
在 MYSQL 的 orders 表中删除那条数据
  1. --MySQL
  2. DELETE FROM orders WHERE order_id = 10004;

可以看到 Kibana 中显示的订单数据并不会因为删除了 orders 中的一条记录而减少。