1. Mysql 添加以下配置(cdc部分)
# 添加 cdc 配置server-id = 1log-bin = mysql-binbinlog_format = rowbinlog-do-db = cdcservice#设置mysql的安装目录basedir = /opt/solfware/mysql-5.7.34#设置mysql数据库的数据存放目录datadir = /opt/solfware/mysql-5.7.34/data#设置端口port = 3306socket = /tmp/mysql.sock#设置字符集character-set-server = utf8#日志存放目录log-error = /opt/solfware/mysql-5.7.34/data/mysqld.logpid-file = /opt/solfware/mysql-5.7.34/data/mysqld.pid#允许时间类型的数据为零(去掉NO_ZERO_IN_DATE,NO_ZERO_DATE)sql_mode=ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
2. 重启 Mysql
sudo systemctl restart mysqld
3. 创建测试表
DROP TABLE IF EXISTS `stuinfo`;CREATE TABLE IF NOT EXISTS `stuinfo`(`stuid` BIGINT(20) UNSIGNED AUTO_INCREMENT PRIMARY KEY,`stuname` VARCHAR(32) NOT NULL,`birthday` DATETIME,`subtime` TIMESTAMP)ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into `stuinfo` values(101, "zhangsan", "2020-10-01", "2020-10-01 10:10:10");insert into `stuinfo` values(102, "lisi", "2020-10-01", "2020-10-01 10:10:10");insert into `stuinfo` values(103, "wangwu", "2020-10-01", "2020-10-01 10:10:10");SELECT * FROM `stuinfo`;insert into `stuinfo` values(104, "zhaoliu", "2020-10-01", "2020-10-01 10:10:10");update `stuinfo` set stuname = "abc" where stuid = 104;delete from `stuinfo` where stuid = 104;
4. 创建服务引入以下坐标
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>cdcservice</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.12.0</flink.version><flink-connector-mysql-cdc.version>2.0.0</flink-connector-mysql-cdc.version><hadoop-client.version>3.3.1</hadoop-client.version><mysql-connector-java.version>8.0.19</mysql-connector-java.version><fastjson.version>1.2.75</fastjson.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!-- #flink的客户端 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-connector-mysql-cdc.version}</version></dependency><!-- hadoop客户端,checkpoin 需要用到 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop-client.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql-connector-java.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
5. 使用 DataStream 操作
package com.example;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import com.ververica.cdc.connectors.mysql.MySqlSource;import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);// 2. 获取 CK(我还没安装 HDFS,所以此处注释)// System.setProperty("HADOOP_USER_NAME", "alibaba" ); // 权限设置// environment.enableCheckpointing(5000L); // 设置间隔,多久开启一次 checkpoint.// environment.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));// 3. 获取 mysql cdc sourceProperties properties = new Properties();properties.setProperty("scan.startup.mode", "initial");SourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("1.15.115.151").port(3306).username("root").password("123456").databaseList("cdcservice") // 监控的database.tableList("cdcservice.stuinfo", "cdcservice.stuinfo_bar") // 监控的table.deserializer(new MyDebeziumDeserializationSchema()) // 自定义返回数据格式.debeziumProperties(properties).build();// 4. 获取 mysql 数据DataStreamSource<String> streamSource = environment.addSource(sourceFunction);// 5. 算子操作(在这里进行业务处理)BusinessProcess businessProcess = new BusinessProcess();streamSource.map(new MapFunction<String, Object>() {@Overridepublic Object map(String dataStr) throws Exception {// 进行业务处理businessProcess.processing(dataStr);return null;}});// 6. 打印streamSource.print();// 7. 执行任务environment.execute();}}
6. 自定义返回数据格式
package com.example;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;public class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {/*** 反序列化* @param sourceRecord* @param collector* @throws Exception*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {// 1.数据快照JSONObject snapshot = getSnapshot(sourceRecord);// 2.输出结果collector.collect(snapshot.toString());}/*** 数据类型* @return*/@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}/*** 获取数据快照* @param sourceRecord* @return*/private JSONObject getSnapshot(SourceRecord sourceRecord) {// 数据快照JSONObject result = new JSONObject();// 1.获取操作的 database 与 table 名称String topic = sourceRecord.topic();String[] split = topic.split("\\.");String dataBaseName = split[1];String tableName = split[2];// 2.获取 SQL 操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);// 3.获取变更前后的数据Struct value = (Struct) sourceRecord.value();// 变更前的数据Struct beforeStruct = value.getStruct("before");// 变更后的数据Struct afterStruct = value.getStruct("after");// 自定义操作最终返回的数据:变更前的数据JSONObject beforeJSONData = new JSONObject();// 自定义操作最终返回的数据:变更后的数据JSONObject afterJSONData = new JSONObject();if (null != beforeStruct && null != afterStruct) { // update 操作// update 前的数据Schema beforeSchema = beforeStruct.schema();for (Field field : beforeSchema.fields()) {beforeJSONData.put(field.name(), beforeStruct.get(field));}// update 后的数据Schema afterSchema = afterStruct.schema();for (Field field : afterSchema.fields()) {afterJSONData.put(field.name(), afterStruct.get(field));}}else if (null != afterStruct) { // insert 操作Schema afterSchema = afterStruct.schema();for (Field field : afterSchema.fields()) {afterJSONData.put(field.name(), afterStruct.get(field));}}else if (null != beforeStruct) { // 删除操作Schema beforeSchema = beforeStruct.schema();for (Field field : beforeSchema.fields()) {beforeJSONData.put(field.name(), beforeStruct.get(field));}}else {System.out.println("异常情况");}// 4. 封装数据快照result.put("dataBaseName", dataBaseName);result.put("tableName", tableName);result.put("operation", operation.toString().toLowerCase());result.put("beforeJSONData", beforeJSONData);result.put("afterJSONData", afterJSONData);return result;}}
7. 业务处理
package com.example;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;import java.io.Serializable;public class BusinessProcess implements Serializable {/*** 每次数据同步过来之后,进入当前方法进行业务处理*/public void processing(String dataStr) {// 1.将 String 转为 JSONJSONObject jsonObject = JSONObject.parseObject(dataStr);// 2.获取 tableNameString tableName = (String) jsonObject.get("tableName");// 3.根据不同的 tableName 进行不同的业务处理操作businessTableProcessing(tableName);}/*** 不同的 table 进行不同的 business Process* @param tableName*/private void businessTableProcessing(String tableName) {if (StringUtils.isEmpty(tableName)) {throw new IllegalArgumentException("table name is null");}else if ("stuinfo".equals(tableName)) {new StuInfoHandler().businessTableProcessing();}else if ("stuinfo_bar".equals(tableName)) {new StuInfoBarHandler().businessTableProcessing();}}}

https://github.com/ververica/flink-cdc-connectors/issues/317
