1. Mysql 添加以下配置(cdc部分)

  1. # 添加 cdc 配置
  2. server-id = 1
  3. log-bin = mysql-bin
  4. binlog_format = row
  5. binlog-do-db = cdcservice
  6. #设置mysql的安装目录
  7. basedir = /opt/solfware/mysql-5.7.34
  8. #设置mysql数据库的数据存放目录
  9. datadir = /opt/solfware/mysql-5.7.34/data
  10. #设置端口
  11. port = 3306
  12. socket = /tmp/mysql.sock
  13. #设置字符集
  14. character-set-server = utf8
  15. #日志存放目录
  16. log-error = /opt/solfware/mysql-5.7.34/data/mysqld.log
  17. pid-file = /opt/solfware/mysql-5.7.34/data/mysqld.pid
  18. #允许时间类型的数据为零(去掉NO_ZERO_IN_DATE,NO_ZERO_DATE)
  19. sql_mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

2. 重启 Mysql

  1. sudo systemctl restart mysqld

3. 创建测试表

  1. DROP TABLE IF EXISTS `stuinfo`;
  2. CREATE TABLE IF NOT EXISTS `stuinfo`(
  3. `stuid` BIGINT(20) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  4. `stuname` VARCHAR(32) NOT NULL,
  5. `birthday` DATETIME,
  6. `subtime` TIMESTAMP
  7. )ENGINE=InnoDB DEFAULT CHARSET=utf8;
  8. insert into `stuinfo` values(101, "zhangsan", "2020-10-01", "2020-10-01 10:10:10");
  9. insert into `stuinfo` values(102, "lisi", "2020-10-01", "2020-10-01 10:10:10");
  10. insert into `stuinfo` values(103, "wangwu", "2020-10-01", "2020-10-01 10:10:10");
  11. SELECT * FROM `stuinfo`;
  12. insert into `stuinfo` values(104, "zhaoliu", "2020-10-01", "2020-10-01 10:10:10");
  13. update `stuinfo` set stuname = "abc" where stuid = 104;
  14. delete from `stuinfo` where stuid = 104;

4. 创建服务引入以下坐标

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>cdcservice</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <flink.version>1.12.0</flink.version>
  13. <flink-connector-mysql-cdc.version>2.0.0</flink-connector-mysql-cdc.version>
  14. <hadoop-client.version>3.3.1</hadoop-client.version>
  15. <mysql-connector-java.version>8.0.19</mysql-connector-java.version>
  16. <fastjson.version>1.2.75</fastjson.version>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-java</artifactId>
  22. <version>${flink.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-streaming-java_2.12</artifactId>
  27. <version>${flink.version}</version>
  28. </dependency>
  29. <!-- #flink的客户端 -->
  30. <dependency>
  31. <groupId>org.apache.flink</groupId>
  32. <artifactId>flink-clients_2.12</artifactId>
  33. <version>${flink.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-table-planner_2.12</artifactId>
  38. <version>${flink.version}</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>com.ververica</groupId>
  42. <artifactId>flink-connector-mysql-cdc</artifactId>
  43. <version>${flink-connector-mysql-cdc.version}</version>
  44. </dependency>
  45. <!-- hadoop客户端,checkpoin 需要用到 -->
  46. <dependency>
  47. <groupId>org.apache.hadoop</groupId>
  48. <artifactId>hadoop-client</artifactId>
  49. <version>${hadoop-client.version}</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>mysql</groupId>
  53. <artifactId>mysql-connector-java</artifactId>
  54. <version>${mysql-connector-java.version}</version>
  55. </dependency>
  56. <dependency>
  57. <groupId>com.alibaba</groupId>
  58. <artifactId>fastjson</artifactId>
  59. <version>${fastjson.version}</version>
  60. </dependency>
  61. </dependencies>
  62. <build>
  63. <plugins>
  64. <plugin>
  65. <groupId>org.apache.maven.plugins</groupId>
  66. <artifactId>maven-assembly-plugin</artifactId>
  67. <version>3.0.0</version>
  68. <configuration>
  69. <descriptorRefs>
  70. <descriptorRef>jar-with-dependencies</descriptorRef>
  71. </descriptorRefs>
  72. </configuration>
  73. <executions>
  74. <execution>
  75. <id>make-assembly</id>
  76. <phase>package</phase>
  77. <goals>
  78. <goal>single</goal>
  79. </goals>
  80. </execution>
  81. </executions>
  82. </plugin>
  83. </plugins>
  84. </build>
  85. </project>

5. 使用 DataStream 操作

  1. package com.example;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  6. import com.ververica.cdc.connectors.mysql.MySqlSource;
  7. import java.util.Properties;
  8. public class FlinkCDC {
  9. public static void main(String[] args) throws Exception {
  10. // 1. 获取执行环境
  11. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  12. environment.setParallelism(1);
  13. // 2. 获取 CK(我还没安装 HDFS,所以此处注释)
  14. // System.setProperty("HADOOP_USER_NAME", "alibaba" ); // 权限设置
  15. // environment.enableCheckpointing(5000L); // 设置间隔,多久开启一次 checkpoint.
  16. // environment.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));
  17. // 3. 获取 mysql cdc source
  18. Properties properties = new Properties();
  19. properties.setProperty("scan.startup.mode", "initial");
  20. SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  21. .hostname("1.15.115.151")
  22. .port(3306)
  23. .username("root")
  24. .password("123456")
  25. .databaseList("cdcservice") // 监控的database
  26. .tableList("cdcservice.stuinfo", "cdcservice.stuinfo_bar") // 监控的table
  27. .deserializer(new MyDebeziumDeserializationSchema()) // 自定义返回数据格式
  28. .debeziumProperties(properties)
  29. .build();
  30. // 4. 获取 mysql 数据
  31. DataStreamSource<String> streamSource = environment.addSource(sourceFunction);
  32. // 5. 算子操作(在这里进行业务处理)
  33. BusinessProcess businessProcess = new BusinessProcess();
  34. streamSource.map(new MapFunction<String, Object>() {
  35. @Override
  36. public Object map(String dataStr) throws Exception {
  37. // 进行业务处理
  38. businessProcess.processing(dataStr);
  39. return null;
  40. }
  41. });
  42. // 6. 打印
  43. streamSource.print();
  44. // 7. 执行任务
  45. environment.execute();
  46. }
  47. }

6. 自定义返回数据格式

  1. package com.example;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.util.Collector;
  7. import org.apache.kafka.connect.data.Field;
  8. import org.apache.kafka.connect.data.Schema;
  9. import org.apache.kafka.connect.data.Struct;
  10. import org.apache.kafka.connect.source.SourceRecord;
  11. public class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
  12. /**
  13. * 反序列化
  14. * @param sourceRecord
  15. * @param collector
  16. * @throws Exception
  17. */
  18. @Override
  19. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  20. // 1.数据快照
  21. JSONObject snapshot = getSnapshot(sourceRecord);
  22. // 2.输出结果
  23. collector.collect(snapshot.toString());
  24. }
  25. /**
  26. * 数据类型
  27. * @return
  28. */
  29. @Override
  30. public TypeInformation<String> getProducedType() {
  31. return TypeInformation.of(String.class);
  32. }
  33. /**
  34. * 获取数据快照
  35. * @param sourceRecord
  36. * @return
  37. */
  38. private JSONObject getSnapshot(SourceRecord sourceRecord) {
  39. // 数据快照
  40. JSONObject result = new JSONObject();
  41. // 1.获取操作的 database 与 table 名称
  42. String topic = sourceRecord.topic();
  43. String[] split = topic.split("\\.");
  44. String dataBaseName = split[1];
  45. String tableName = split[2];
  46. // 2.获取 SQL 操作类型
  47. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  48. // 3.获取变更前后的数据
  49. Struct value = (Struct) sourceRecord.value();
  50. // 变更前的数据
  51. Struct beforeStruct = value.getStruct("before");
  52. // 变更后的数据
  53. Struct afterStruct = value.getStruct("after");
  54. // 自定义操作最终返回的数据:变更前的数据
  55. JSONObject beforeJSONData = new JSONObject();
  56. // 自定义操作最终返回的数据:变更后的数据
  57. JSONObject afterJSONData = new JSONObject();
  58. if (null != beforeStruct && null != afterStruct) { // update 操作
  59. // update 前的数据
  60. Schema beforeSchema = beforeStruct.schema();
  61. for (Field field : beforeSchema.fields()) {
  62. beforeJSONData.put(field.name(), beforeStruct.get(field));
  63. }
  64. // update 后的数据
  65. Schema afterSchema = afterStruct.schema();
  66. for (Field field : afterSchema.fields()) {
  67. afterJSONData.put(field.name(), afterStruct.get(field));
  68. }
  69. }else if (null != afterStruct) { // insert 操作
  70. Schema afterSchema = afterStruct.schema();
  71. for (Field field : afterSchema.fields()) {
  72. afterJSONData.put(field.name(), afterStruct.get(field));
  73. }
  74. }else if (null != beforeStruct) { // 删除操作
  75. Schema beforeSchema = beforeStruct.schema();
  76. for (Field field : beforeSchema.fields()) {
  77. beforeJSONData.put(field.name(), beforeStruct.get(field));
  78. }
  79. }else {
  80. System.out.println("异常情况");
  81. }
  82. // 4. 封装数据快照
  83. result.put("dataBaseName", dataBaseName);
  84. result.put("tableName", tableName);
  85. result.put("operation", operation.toString().toLowerCase());
  86. result.put("beforeJSONData", beforeJSONData);
  87. result.put("afterJSONData", afterJSONData);
  88. return result;
  89. }
  90. }

7. 业务处理

  1. package com.example;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.apache.commons.lang3.StringUtils;
  4. import java.io.Serializable;
  5. public class BusinessProcess implements Serializable {
  6. /**
  7. * 每次数据同步过来之后,进入当前方法进行业务处理
  8. */
  9. public void processing(String dataStr) {
  10. // 1.将 String 转为 JSON
  11. JSONObject jsonObject = JSONObject.parseObject(dataStr);
  12. // 2.获取 tableName
  13. String tableName = (String) jsonObject.get("tableName");
  14. // 3.根据不同的 tableName 进行不同的业务处理操作
  15. businessTableProcessing(tableName);
  16. }
  17. /**
  18. * 不同的 table 进行不同的 business Process
  19. * @param tableName
  20. */
  21. private void businessTableProcessing(String tableName) {
  22. if (StringUtils.isEmpty(tableName)) {
  23. throw new IllegalArgumentException("table name is null");
  24. }else if ("stuinfo".equals(tableName)) {
  25. new StuInfoHandler().businessTableProcessing();
  26. }else if ("stuinfo_bar".equals(tableName)) {
  27. new StuInfoBarHandler().businessTableProcessing();
  28. }
  29. }
  30. }

image.png

https://github.com/ververica/flink-cdc-connectors/issues/317
image.png