1、地址

https://github.com/ververica/flink-cdc-connectors

2、依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.12.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.12</artifactId>
  10. <version>1.12.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.12</artifactId>
  15. <version>1.12.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.hadoop</groupId>
  19. <artifactId>hadoop-client</artifactId>
  20. <version>3.1.3</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>mysql</groupId>
  24. <artifactId>mysql-connector-java</artifactId>
  25. <version>5.1.49</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-table-planner-blink_2.12</artifactId>
  30. <version>1.12.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.ververica</groupId>
  34. <artifactId>flink-connector-mysql-cdc</artifactId>
  35. <version>2.0.0</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.alibaba</groupId>
  39. <artifactId>fastjson</artifactId>
  40. <version>1.2.75</version>
  41. </dependency>
  42. </dependencies>
  43. <build>
  44. <plugins>
  45. <plugin>
  46. <groupId>org.apache.maven.plugins</groupId>
  47. <artifactId>maven-assembly-plugin</artifactId>
  48. <version>3.0.0</version>
  49. <configuration>
  50. <descriptorRefs>
  51. <descriptorRef>jar-with-dependencies</descriptorRef>
  52. </descriptorRefs>
  53. </configuration>
  54. <executions>
  55. <execution>
  56. <id>make-assembly</id>
  57. <phase>package</phase>
  58. <goals>
  59. <goal>single</goal>
  60. </goals>
  61. </execution>
  62. </executions>
  63. </plugin>
  64. </plugins>
  65. </build>

3、DataStream

  1. public class FlinkCDC {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
  7. //2.1 开启Checkpoint,每隔5秒钟做一次CK
  8. env.enableCheckpointing(5000L);
  9. //2.2 指定CK的一致性语义
  10. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  11. //2.3 设置任务关闭的时候保留最后一次CK数据
  12. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  13. //2.4 指定从CK自动重启策略
  14. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
  15. //2.5 设置状态后端
  16. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
  17. //2.6 设置访问HDFS的用户名
  18. System.setProperty("HADOOP_USER_NAME", "atguigu");
  19. //3.创建Flink-MySQL-CDC的Source
  20. //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  21. //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  22. //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
  23. //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
  24. DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
  25. .hostname("hadoop102")
  26. .port(3306)
  27. .username("root")
  28. .password("000000")
  29. .databaseList("gmall-flink")
  30. .tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
  31. .startupOptions(StartupOptions.initial())
  32. .deserializer(new StringDebeziumDeserializationSchema())
  33. .build();
  34. //4.使用CDC Source从MySQL读取数据
  35. DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
  36. //5.打印数据
  37. mysqlDS.print();
  38. //6.执行任务
  39. env.execute();
  40. }
  41. }

4、FlinkSql

  1. public class FlinkSQL_CDC {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  7. //2.创建Flink-MySQL-CDC的Source
  8. tableEnv.executeSql("CREATE TABLE user_info (" +
  9. " id INT," +
  10. " name STRING," +
  11. " phone_num STRING" +
  12. ") WITH (" +
  13. " 'connector' = 'mysql-cdc'," +
  14. " 'hostname' = 'hadoop102'," +
  15. " 'port' = '3306'," +
  16. " 'username' = 'root'," +
  17. " 'password' = '000000'," +
  18. " 'database-name' = 'gmall-flink'," +
  19. " 'table-name' = 'z_user_info'" +
  20. ")");
  21. tableEnv.executeSql("select * from user_info").print();
  22. env.execute();
  23. }
  24. }

5、反序列化

  1. new DebeziumDeserializationSchema<String>() { //自定义数据解析器
  2. @Override
  3. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  4. //获取主题信息,包含着数据库和表名 mysql_binlog_source.gmall-flink.z_user_info
  5. String topic = sourceRecord.topic();
  6. String[] arr = topic.split("\\.");
  7. String db = arr[1];
  8. String tableName = arr[2];
  9. //获取操作类型 READ DELETE UPDATE CREATE
  10. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  11. //获取值信息并转换为Struct类型
  12. Struct value = (Struct) sourceRecord.value();
  13. //获取变化后的数据
  14. Struct after = value.getStruct("after");
  15. //创建JSON对象用于存储数据信息
  16. JSONObject data = new JSONObject();
  17. for (Field field : after.schema().fields()) {
  18. Object o = after.get(field);
  19. data.put(field.name(), o);
  20. }
  21. //创建JSON对象用于封装最终返回值数据信息
  22. JSONObject result = new JSONObject();
  23. result.put("operation", operation.toString().toLowerCase());
  24. result.put("data", data);
  25. result.put("database", db);
  26. result.put("table", tableName);
  27. //发送数据至下游
  28. collector.collect(result.toJSONString());
  29. }
  30. @Override
  31. public TypeInformation<String> getProducedType() {
  32. return TypeInformation.of(String.class);
  33. }
  34. })

6、flinkcdc1.0痛点

image.png

image.png

7、设计思想

在对于有主键的表做初始化模式,整体的流程主要分为5个阶段:
1.Chunk切分;
2.Chunk分配;(实现并行读取数据&CheckPoint)
3.Chunk读取;(实现无锁读取)
4.Chunk汇报;
5.Chunk分配。

8、flinksql和ds区别

ds能监控整个库和指定的表,sql只能监控单表
ds需要自己序列化,sql已经序列化好
注意flink和cdc对应版本

二、例子

mongo_cdc.pdf

1、MongoDB CDC实践

1)原始数据

image.png

2)cdc数据:change streams

insert

  1. {
  2. "_id": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}, \"copyingData\": true}",
  3. "operationType": "insert",
  4. "fullDocument": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}, \"order_id\": 102.0, \"order_date\": {\"$date\": 1596103869001}, \"customer_id\": 1002.0, \"price\": {\"$numberDecimal\": \"15.00\"}, \"product\": {\"name\": \"car battery\", \"description\": \"12V car battery\"}, \"order_status\": false}",
  5. "source": {
  6. "ts_ms": 0,
  7. "snapshot": "true"
  8. },
  9. "ns": {
  10. "db": "hlh",
  11. "coll": "orders"
  12. },
  13. "to": null,
  14. "documentKey": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}}",
  15. "updateDescription": null,
  16. "clusterTime": null,
  17. "txnNumber": null,
  18. "lsid": null
  19. }

update

  1. {
  2. "_id": "{\"_data\": \"8262A00E3D0000003F2B022C0100296E5A1004E80FAED9CBC34B7BAAF866694C9D24AB46645F69640064619619252C656E0CDA5676830004\"}",
  3. "operationType": "update",
  4. "fullDocument": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}, \"order_id\": 102.0, \"order_date\": {\"$date\": 1596103869001}, \"customer_id\": 1002.0, \"price\": {\"$numberDecimal\": \"16.00\"}, \"product\": {\"name\": \"car battery\", \"description\": \"12V car battery\"}, \"order_status\": false}",
  5. "source": {
  6. "ts_ms": 1654656573000,
  7. "snapshot": null
  8. },
  9. "ns": {
  10. "db": "hlh",
  11. "coll": "orders"
  12. },
  13. "to": null,
  14. "documentKey": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}}",
  15. "updateDescription": {
  16. "updatedFields": "{\"price\": {\"$numberDecimal\": \"16.00\"}}",
  17. "removedFields": []
  18. },
  19. "clusterTime": "{\"$timestamp\": {\"t\": 1654656573, \"i\": 63}}",
  20. "txnNumber": null,
  21. "lsid": null
  22. }

delete

  1. {
  2. "_id": "{\"_data\": \"8262A00F890000000A2B022C0100296E5A1004E80FAED9CBC34B7BAAF866694C9D24AB46645F69640064619619252C656E0CDA5676830004\"}",
  3. "operationType": "delete",
  4. "fullDocument": null,
  5. "source": {
  6. "ts_ms": 1654656905000,
  7. "snapshot": null
  8. },
  9. "ns": {
  10. "db": "hlh",
  11. "coll": "orders"
  12. },
  13. "to": null,
  14. "documentKey": "{\"_id\": {\"$oid\": \"619619252c656e0cda567683\"}}",
  15. "updateDescription": null,
  16. "clusterTime": "{\"$timestamp\": {\"t\": 1654656905, \"i\": 10}}",
  17. "txnNumber": null,
  18. "lsid": null
  19. }

3)OpLog

image.png
MongoDB oplog 没存变更前和变更后的完整数据。fullDocument是获取到变更事件文档的最新状态。
image.png
当我们从检查点或保存点恢复 Flink 作业时,心跳事件可以将 resumeToken 向前推送,以避免 resumeToken 过期。

2、Flink CDC实践

1)原始数据

  • 全量读取
    1. {
    2. "PUSHKITTOKEN": "",
    3. "UPDATETIME": "2017-08-10 01:42:31",
    4. "PUSHTYPE": 0,
    5. "APNSTOKEN": "cea5bc9e4f5b74213424bada03a3959c821f230bdc1cc0e1ab66b809182170cf",
    6. "MOBILEOPERATOR": "movistar",
    7. "JAILBREAK": 0,
    8. "SPARE1": "",
    9. "RESERVE": "",
    10. "VERSION_STR": "",
    11. "DEVICEID": "7a3cc059f6d66bd77bbae29fbed291a123dcf6db",
    12. "SPARE2": 0,
    13. "OSVERSION": "10.3.3",
    14. "APPSTORECOUNTRY": "",
    15. "OSLANG": "en-GB",
    16. "USERID": 8051194,
    17. "VERSION": "2.3.6",
    18. "OSTYPE": 0,
    19. "DEVICEDETAIL": "iPhone7,2",
    20. "OPERATORCOUNTRY": "cl",
    21. "operation": "read", -- 操作类型
    22. "table": "HT_USER_TERMINAL"
    23. }