CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的
变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下
来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 主要分为基于查询和基于 Binlog 两种方式,这两种之间的区别:


基于查询的 CDC 基于 Binlog 的 CDC
开源技术 Datax、Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium、Flink CDC
执行模式 Batch Streaming
延迟性 高延迟 低延迟
是否增加数据库压力

基于查询的 CDC 就不做介绍了,下文主要介绍一下 Flink 社区开源的 Flink CDC:
image.png

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL
等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:
https://github.com/ververica/flink-cdc-connectors
社区文档:https://ververica.github.io/flink-cdc-connectors/master/

下面根据官方网站提供的教程,体验一下 Flink CDC

安装 Docker 环境

创建文件 docker-compose.yml

  1. version: '2.1'
  2. services:
  3. postgres:
  4. image: debezium/example-postgres:1.1
  5. ports:
  6. - "5432:5432"
  7. environment:
  8. - POSTGRES_PASSWORD=1234
  9. - POSTGRES_DB=postgres
  10. - POSTGRES_USER=postgres
  11. - POSTGRES_PASSWORD=postgres
  12. mysql:
  13. image: debezium/example-mysql:1.1
  14. ports:
  15. - "3306:3306"
  16. environment:
  17. - MYSQL_ROOT_PASSWORD=123456
  18. - MYSQL_USER=mysqluser
  19. - MYSQL_PASSWORD=mysqlpw
  20. elasticsearch:
  21. image: docker.elastic.co/elasticsearch/elasticsearch:7.10.2
  22. environment:
  23. - cluster.name=docker-cluster
  24. - bootstrap.memory_lock=true
  25. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  26. - discovery.type=single-node
  27. ports:
  28. - "9200:9200"
  29. - "9300:9300"
  30. ulimits:
  31. memlock:
  32. soft: -1
  33. hard: -1
  34. nofile:
  35. soft: 65536
  36. hard: 65536
  37. kibana:
  38. image: elastic/kibana:7.6.0
  39. ports:
  40. - "5601:5601"

使用命令,启动容器

  1. docker-compose up -d

准备 Flink 环境

1.安装 Flink

访问页面 https://downloads.apache.org/flink/ 这里选择 Flink 13.3
截屏2021-11-21 下午6.26.47.png

2.安装SQL连接器

下载下面列出的依赖包,并将它们放到目录 flink-1.13.3/lib/ 下:

INSERT INTO products VALUES (default,”scooter”,”Small 2-wheel scooter”), (default,”car battery”,”12V car battery”), (default,”12-pack drill bits”,”12-pack of drill bits with sizes ranging from #40 to #3”), (default,”hammer”,”12oz carpenter’s hammer”), (default,”hammer”,”14oz carpenter’s hammer”), (default,”hammer”,”16oz carpenter’s hammer”), (default,”rocks”,”box of assorted rocks”), (default,”jacket”,”water resistent black wind breaker”), (default,”spare tire”,”24 inch spare tire”);

CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL — Whether order has been placed ) AUTO_INCREMENT = 10001;

INSERT INTO orders VALUES (default, ‘2020-07-30 10:08:22’, ‘Jark’, 50.50, 102, false), (default, ‘2020-07-30 10:11:09’, ‘Sally’, 15.00, 105, false), (default, ‘2020-07-30 12:00:30’, ‘Edward’, 25.25, 106, false);

  1. 连接 postgres
  2. ```bash
  3. docker-compose exec postgres psql -h localhost -U postgres

创建表结构 & 数据

遇到的问题

https://stackoverflow.com/questions/65962810/m1-mac-issue-bringing-up-elasticsearch-cannot-run-jdk-bin-java

Causedby: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink$DefaultRestClientFactory

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector ‘elasticsearch-7’ can only be used as a sink. It cannot be used as a source.