安装Flink

Flink 安装

根据官网的示例做一次MySQL->MySQL 的同步

准备sql

注意两个数据库不一样

  • 我用的是docker启动的两个数据库

数据库为:mysql8.0 +

源数据

  1. CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `flink_test` (
  3. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  4. `t_name` varchar(100) NOT NULL COMMENT '名称',
  5. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  6. PRIMARY KEY (`id`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

目标数据

一模一样的同步

  1. CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `flink_test` (
  3. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  4. `t_name` varchar(100) NOT NULL COMMENT '名称',
  5. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  6. PRIMARY KEY (`id`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

不一样的同步

  1. CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `flink_test2` (
  3. `name` varchar(100) NOT NULL COMMENT '名称',
  4. `logo` varchar(200) DEFAULT NULL COMMENT '图片路径',
  5. `UNIQUE KEY name (name`)`
  6. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

在 Flink SQL CLI 中使用 Flink DDL 创建表

进入 sql CLI : **./bin/sql-client.sh** 依次输入以下命令后,对表格进行增删改查看效果

  • MySQL IP注意更改
  1. SET execution.checkpointing.interval = 3s;
  2. CREATE TABLE mysql_binlog (
  3. id INT NOT NULL,
  4. t_name STRING,
  5. logo_url STRING,
  6. primary key (id) not enforced
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = '192.168.0.99',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = 'root',
  13. 'database-name' = 'flink_source',
  14. 'table-name' = 'flink_test'
  15. );
  16. CREATE TABLE test_cdc_sink (
  17. id INT,
  18. t_name STRING,
  19. logo_url STRING,
  20. primary key (id) not enforced
  21. ) WITH (
  22. 'connector' = 'jdbc',
  23. 'driver' = 'com.mysql.cj.jdbc.Driver',
  24. 'url' = 'jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false',
  25. 'username' = 'root',
  26. 'password' = 'root',
  27. 'table-name' = 'flink_test'
  28. );
  29. insert into test_cdc_sink select * from mysql_binlog;

查看Flink任务同时测试数据同步

对源数据进行增删改看看目标数据有咩有进行同步

image.png

使用JAVA实现上述功能

搭建 Flink 项目

Flink项目搭建

在Flink的lib中新增jar

也可以不用上传,那包就不轻量了

  • flink-connector-jdbc_2.12-1.13.5.jar
  • flink-sql-connector-mysql-cdc-2.1.1.jar
  • Gson 我就没有上传,打进工作包里直接用了
  • image.png

    maven详情

    ```xml <!— Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. —>

4.0.0 cn.tannn flink-demo-1 1.0-SNAPSHOT jar Flink Walkthrough DataStream Java https://flink.apache.org UTF-8 1.13.5 1.8 2.12 ${target.java.version} ${target.java.version} 2.16.0 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-walkthrough-common${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients${scala.binary.version} ${flink.version} provided <!— Example: org.apache.flink flink-connector-kafka${scala.binary.version} ${flink.version} —> org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime com.ververica flink-sql-connector-mysql-cdc 2.1.1 provided org.apache.flink flink-connector-jdbc${scala.binary.version} ${flink.version} provided com.google.code.gson gson 2.8.9 org.apache.flink flink-table-planner-blink${scala.binary.version} ${flink.version} provided org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j: org.apache.logging.log4j: : META-INF/.SF META-INF/.DSA META-INF/*.RSA cn.tannn.FlinkCDCWitchSQL org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.0.0,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile
  1. <a name="PlRTa"></a>
  2. ## JAVA代码详情 SQL
  3. ```java
  4. package cn.tannn;
  5. import org.apache.flink.streaming.api.CheckpointingMode;
  6. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.table.api.EnvironmentSettings;
  9. import org.apache.flink.table.api.SqlDialect;
  10. import org.apache.flink.table.api.TableResult;
  11. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  12. public class FlinkCDCWitchSQL {
  13. public static void main(String[] args) throws Exception {
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. // Configuration configuration = new Configuration();
  16. // configuration.setString(RestOptions.BIND_PORT, "8081-8089");
  17. // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  18. // env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///F://project//java//flink-cdc-demo-1//doc"));
  19. // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
  20. env.enableCheckpointing(3000);
  21. // 高级选项:
  22. // 设置模式为exactly-once (这是默认值)
  23. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  24. // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
  25. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
  26. // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
  27. env.getCheckpointConfig().setCheckpointTimeout(60000);
  28. // 同一时间只允许进行一个检查点
  29. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  30. // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
  31. //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  32. //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  33. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  34. env.setParallelism(1);
  35. EnvironmentSettings Settings = EnvironmentSettings.newInstance()
  36. .useBlinkPlanner()
  37. .inStreamingMode()
  38. .build();
  39. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
  40. // 设置 jobName
  41. tableEnv.getConfig().getConfiguration().setString("pipeline.name", "table_sql_job");
  42. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  43. // 数据源表
  44. String sourceDDL = "CREATE TABLE mysql_binlog1 ( " +
  45. " id INT NOT NULL, " +
  46. " t_name STRING, " +
  47. " logo_url STRING, " +
  48. " primary key (id) not enforced " +
  49. ") WITH ( " +
  50. " 'connector' = 'mysql-cdc', " +
  51. " 'hostname' = '192.168.0.99', " +
  52. " 'port' = '3306', " +
  53. " 'username' = 'root', " +
  54. " 'password' = 'root', " +
  55. " 'database-name' = 'flink_source', " +
  56. " 'table-name' = 'flink_test' " +
  57. // " , 'scan.startup.mode' = 'latest-offset' " + // 默认全量加增量
  58. ") ";
  59. // 输出目标表 (一样的字段)
  60. String sinkDDL =
  61. "CREATE TABLE test_cdc_sink1 ( " +
  62. " id INT, " +
  63. " t_name STRING, " +
  64. " logo_url STRING, " +
  65. " primary key (id) not enforced " +
  66. ") WITH ( " +
  67. " 'connector' = 'jdbc', " +
  68. " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  69. " 'url' = 'jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false'," +
  70. " 'username' = 'root', " +
  71. " 'password' = 'root', " +
  72. " 'table-name' = 'flink_test' " +
  73. ")";
  74. // 输出目标表 (不一样的字段 没有主键 更新和删除会出问题,实在没有主键的情况下必须设置唯一索引)
  75. String sinkDDL2 =
  76. "CREATE TABLE test_cdc_sink2 ( " +
  77. " name STRING, " +
  78. " logo STRING, " +
  79. " primary key (name) not enforced " +
  80. ") WITH ( " +
  81. " 'connector' = 'jdbc', " +
  82. " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  83. " 'url' = 'jdbc:mysql://127.0.0.1:3316/test?serverTimezone=Asia/Shanghai&useSSL=false'," +
  84. " 'username' = 'root', " +
  85. " 'password' = 'root', " +
  86. " 'table-name' = 'flink_test2' " +
  87. ")";
  88. // 简单的聚合处理
  89. String transformDmlSQL = "insert into test_cdc_sink1 select * from mysql_binlog1";
  90. String transformDmlSQL2 = "insert into test_cdc_sink2 select tm_name as name , logo_url as logo from mysql_binlog1";
  91. TableResult sourceSQL = tableEnv.executeSql(sourceDDL);
  92. TableResult sinkSQL = tableEnv.executeSql(sinkDDL);
  93. TableResult transformSQL = tableEnv.executeSql(transformDmlSQL);
  94. // tableEnv.executeSql(sinkDDL2);
  95. // tableEnv.executeSql(transformDmlSQL2);
  96. // 下面的不注释上传到集群会报错
  97. // sourceSQL.print();
  98. // transformSQL.print();
  99. // sinkSQL.print();
  100. // env.execute("mysql-sql-mysql-cdc");
  101. }
  102. }

JAVA代码详情 DataStream API

自定义的存储,数据库已存在的不会同步过去

  • 可以把 Read的数据 insert 就行了

FlinkCDC

  1. package cn.tannn;
  2. import cn.tannn.sink.MysqlSink;
  3. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  4. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. /**
  9. * @author tn
  10. * @version 1
  11. * @date 2022-02-06 21:34
  12. */
  13. public class FlinkCDC {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.enableCheckpointing(3000);
  17. // 读取mysqlbinlog
  18. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  19. .hostname("192.168.0.99")
  20. .port(3306)
  21. .databaseList("flink_source") // set captured database
  22. .tableList("flink_source.flink_test") // set captured table
  23. .username("root")
  24. .password("root")
  25. .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  26. .build();
  27. DataStreamSource<String> mySQL_source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
  28. // sink操作
  29. mySQL_source.print();
  30. mySQL_source.addSink(new MysqlSink());
  31. env.execute("mysql-mysql-cdc");
  32. }
  33. }

MysqlSink

  1. package cn.tannn.sink;
  2. import com.google.gson.Gson;
  3. import com.google.gson.JsonElement;
  4. import com.google.gson.JsonObject;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.sql.Connection;
  10. import java.sql.DriverManager;
  11. import java.sql.PreparedStatement;
  12. public class MysqlSink extends RichSinkFunction<String>{
  13. protected final Logger logger = LoggerFactory.getLogger(this.getClass());
  14. Connection connection;
  15. PreparedStatement iStmt,dStmt,uStmt;
  16. private Connection getConnection() {
  17. Connection conn = null;
  18. try {
  19. Class.forName("com.mysql.cj.jdbc.Driver");
  20. String url = "jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false";
  21. conn = DriverManager.getConnection(url,"root","root");
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. return conn;
  26. }
  27. @Override
  28. public void open(Configuration parameters) throws Exception {
  29. super.open(parameters);
  30. connection = getConnection();
  31. String insertSql = "insert into flink_test(`id`, `t_name`, `logo_url`) values (?,?,?)";
  32. String deleteSql = "delete from flink_test where id=?";
  33. String updateSql = "update flink_test set t_name=?,logo_url=? where id=?";
  34. iStmt = connection.prepareStatement(insertSql);
  35. dStmt = connection.prepareStatement(deleteSql);
  36. uStmt = connection.prepareStatement(updateSql);
  37. }
  38. // 每条记录插入时调用一次
  39. @Override
  40. public void invoke(String value, Context context) throws Exception {
  41. Gson t = new Gson();
  42. JsonObject hs = t.fromJson(value, JsonObject.class);
  43. JsonObject source = hs.get("source").getAsJsonObject();
  44. String database = source.get("db").getAsString();
  45. String table = source.get("table").getAsString();
  46. // 操作类型
  47. String type = hs.get("op").getAsString();
  48. if ("flink".equals(database) && "base_trademark".equals(table)) {
  49. if (OperationTypeEnum.INSERT.getAbbr().equals(type)) {
  50. logger.info("insert => " + value);
  51. JsonObject data = hs.get("after").getAsJsonObject();
  52. String id = verifyNull(data.get("id"));
  53. String tmName = verifyNull(data.get("t_name"));
  54. String logoUrl = verifyNull(data.get("logo_url"));
  55. iStmt.setString(1, id);
  56. iStmt.setString(2, tmName);
  57. iStmt.setString(3, logoUrl);
  58. iStmt.executeUpdate();
  59. }else if (OperationTypeEnum.DELETE.getAbbr().equals(type)) {
  60. logger.info("delete => " + value);
  61. JsonObject data = hs.get("before").getAsJsonObject();
  62. String id = verifyNull(data.get("id"));
  63. dStmt.setString(1, id);
  64. dStmt.executeUpdate();
  65. }else if (OperationTypeEnum.UPDATE.getAbbr().equals(type)) {
  66. logger.info("update => " + value);
  67. JsonObject data = hs.get("after").getAsJsonObject();
  68. String id = verifyNull(data.get("id"));
  69. String tmName = verifyNull(data.get("t_name"));
  70. String logoUrl = verifyNull(data.get("logo_url"));
  71. uStmt.setString(1, tmName);
  72. uStmt.setString(2, logoUrl);
  73. uStmt.setString(3, id);
  74. int count = uStmt.executeUpdate();
  75. if(count<=0){
  76. //不存在需要被更新的数据,那就插入
  77. iStmt.setString(1, id);
  78. iStmt.setString(2, tmName);
  79. iStmt.setString(3, logoUrl);
  80. iStmt.executeUpdate();
  81. }
  82. }else {
  83. logger.info("read => " + value);
  84. }
  85. }
  86. }
  87. @Override
  88. public void close() throws Exception {
  89. super.close();
  90. if(iStmt != null) {
  91. iStmt.close();
  92. }
  93. if(dStmt != null) {
  94. dStmt.close();
  95. }
  96. if(uStmt != null) {
  97. uStmt.close();
  98. }
  99. if(connection != null) {
  100. connection.close();
  101. }
  102. }
  103. private static String verifyNull(JsonElement jsonElement) {
  104. if(jsonElement.isJsonNull()){
  105. return null;
  106. }
  107. return jsonElement.getAsString();
  108. }
  109. }

OperationTypeEnum

  1. package cn.tannn.sink;
  2. /**
  3. * 操作类型
  4. *
  5. * @author tn
  6. * @date 2022-02-07 15:44
  7. */
  8. @SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
  9. public enum OperationTypeEnum {
  10. INSERT("c","insert","新增插入(after)"),
  11. UPDATE("u","update","更新修改(before and after)"),
  12. DELETE("d","delete","删除(before)"),
  13. READ("r","select","查询(after)"),
  14. ;
  15. private String abbr;
  16. private String type;
  17. private String chineseName;
  18. OperationTypeEnum(String abbr, String type, String chineseName) {
  19. this.abbr = abbr;
  20. this.type = type;
  21. this.chineseName = chineseName;
  22. }
  23. public static String getTypeByAddr(String addr){
  24. OperationTypeEnum[] values = values();
  25. for (OperationTypeEnum va : values ) {
  26. if(va.abbr.equals(addr)){
  27. return va.type;
  28. }
  29. }
  30. return null;
  31. }
  32. public static OperationTypeEnum getByAddr(String addr){
  33. OperationTypeEnum[] values = values();
  34. for (OperationTypeEnum va : values ) {
  35. if(va.abbr.equals(addr)){
  36. return va;
  37. }
  38. }
  39. return null;
  40. }
  41. public String getAbbr() {
  42. return abbr;
  43. }
  44. public String getType() {
  45. return type;
  46. }
  47. public String getChineseName() {
  48. return chineseName;
  49. }
  50. }

打包上传启动

还可以使用命令行的方式操作 ./bin/flink run -c cn.tannn.FlinkCDCWitchSQL /xx/flink-demo-1-1.0-SNAPSHOT.jar

  • JobMain全限定名: cn.tannn.FlinkCDCWitchSQL
  • JobJar 路径:/xx/flink-demo-1-1.0-SNAPSHOT.jar

image.png
image.png
image.png