left join pgsql和mysql中的两张表生成新数据后放入mysql中

  • flink cdc pgsql
  • image.png

问题注意

Parallelism

  1. 只能设置为 1 , 要不然数据同步会出问题
    1. env.setParallelism_(_1_)_;
    2. SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
    3. SET 'pipeline.max-parallelism' = '1'; -- optional: Flink's maximum parallelism

      submit job 时报错,但是任务提交却成功了

      ```java // sunmit job err tableEnv.executeSql(sourceUserDDL); tableEnv.executeSql(sourceClassesDDL); tableEnv.executeSql(sinkDDL); tableEnv.executeSql(transformDmlSQL);

// submit job success (真他妈神奇) TableResult tableResult = tableEnv.executeSql(sourceUserDDL); TableResult tableResult1 = tableEnv.executeSql(sourceClassesDDL); TableResult tableResult2 = tableEnv.executeSql(sinkDDL); TableResult tableResult3 = tableEnv.executeSql(transformDmlSQL);

  1. <a name="lsOYO"></a>
  2. # 建表语句
  3. ```sql
  4. # 源数据用户表 <MySQL>
  5. CREATE TABLE `user` (
  6. `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  7. `id` int NOT NULL AUTO_INCREMENT,
  8. `class_no` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  11. # 源数据班级表 <postgres SQL>
  12. CREATE TABLE "public"."classes" (
  13. "id" int4 NOT NULL DEFAULT nextval('classes_id_seq'::regclass),
  14. "class_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
  15. "class_no" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
  16. CONSTRAINT "pk_port_forward" PRIMARY KEY ("id")
  17. );
  18. ALTER TABLE "public"."classes"
  19. OWNER TO "postgres";
  20. ## sink表<mysql>
  21. CREATE TABLE `user` (
  22. `id` int NOT NULL,
  23. `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  24. `class_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  25. PRIMARY KEY (`id`)
  26. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

Flink SQL准备

  1. CREATE TABLE mysql_user_binlog (
  2. id INT NOT NULL,
  3. name STRING,
  4. class_no STRING,
  5. primary key (id) not enforced
  6. ) WITH (
  7. 'connector' = 'mysql-cdc',
  8. 'hostname' = '192.168.31.61',
  9. 'port' = '3306',
  10. 'username' = 'root',
  11. 'password' = 'root',
  12. 'database-name' = 'flink_sql',
  13. 'table-name' = 'user'
  14. );
  15. CREATE TABLE postgres_class_binlog (
  16. id INT NOT NULL,
  17. class_name STRING,
  18. class_no STRING,
  19. primary key (id) not enforced
  20. ) WITH (
  21. 'connector' = 'postgres-cdc',
  22. 'hostname' = '192.168.31.61',
  23. 'port' = '5432',
  24. 'username' = 'postgres',
  25. 'password' = 'root',
  26. 'database-name' = 'flink',
  27. 'schema-name' = 'public',
  28. 'table-name' = 'classes',
  29. 'decoding.plugin.name' = 'pgoutput'
  30. );
  31. CREATE TABLE user_class_sink (
  32. id INT,
  33. name STRING,
  34. class_name STRING,
  35. primary key (id) not enforced
  36. ) WITH (
  37. 'connector' = 'jdbc',
  38. 'driver' = 'com.mysql.cj.jdbc.Driver',
  39. 'url' = 'jdbc:mysql://192.168.31.61:3306/flink?serverTimezone=Asia/Shanghai&useSSL=false',
  40. 'username' = 'root',
  41. 'password' = 'root',
  42. 'table-name' = 'user'
  43. );
  44. SET 'pipeline.name' = 'PgSQLAndMySQL2MySQL';
  45. SET 'parallelism.default' = '1';
  46. SET 'pipeline.max-parallelism' = '1';
  47. # sink
  48. insert into user_class_sink select mu.id, mu.name,pc.class_name from mysql_user_binlog as mu left join postgres_class_binlog as pc on mu.class_no = pc.class_no;

Maven依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  11. <version>${flink.version}</version>
  12. <scope>provided</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  17. <version>${flink.version}</version>
  18. <scope>provided</scope>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.logging.log4j</groupId>
  22. <artifactId>log4j-slf4j-impl</artifactId>
  23. <version>${log4j.version}</version>
  24. <scope>runtime</scope>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.logging.log4j</groupId>
  28. <artifactId>log4j-api</artifactId>
  29. <version>${log4j.version}</version>
  30. <scope>runtime</scope>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.logging.log4j</groupId>
  34. <artifactId>log4j-core</artifactId>
  35. <version>${log4j.version}</version>
  36. <scope>runtime</scope>
  37. </dependency>
  38. <!--flink cdc connector -->
  39. <dependency>
  40. <groupId>com.ververica</groupId>
  41. <!-- add the dependency matching your database -->
  42. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  43. <!-- the dependency is available only for stable releases. -->
  44. <version>2.1.1</version>
  45. <scope>provided</scope>
  46. </dependency>
  47. <!-- flink cdc pgsql -->
  48. <dependency>
  49. <groupId>com.ververica</groupId>
  50. <artifactId>flink-sql-connector-postgres-cdc</artifactId>
  51. <version>2.1.1</version>
  52. <scope>provided</scope>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.flink</groupId>
  56. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  57. <version>${flink.version}</version>
  58. <scope>provided</scope>
  59. </dependency>
  60. <dependency>
  61. <groupId>org.apache.flink</groupId>
  62. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  63. <version>${flink.version}</version>
  64. <scope>provided</scope>
  65. </dependency>
  66. </dependencies>

JAVA代码

  1. package cn.tannn;
  2. import cn.tannn.constant.FlinSqlConstant;
  3. import org.apache.flink.streaming.api.CheckpointingMode;
  4. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.EnvironmentSettings;
  7. import org.apache.flink.table.api.SqlDialect;
  8. import org.apache.flink.table.api.TableResult;
  9. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  10. /**
  11. * post
  12. */
  13. public class PgSQLAndMySQL2MySQL {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.enableCheckpointing(3000);
  17. // 高级选项:
  18. // 设置模式为exactly-once (这是默认值)
  19. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  20. // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
  21. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
  22. // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
  23. env.getCheckpointConfig().setCheckpointTimeout(60000);
  24. // 同一时间只允许进行一个检查点
  25. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  26. // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
  27. //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  28. //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  29. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  30. env.setParallelism(1);
  31. EnvironmentSettings Settings = EnvironmentSettings.newInstance()
  32. .useBlinkPlanner()
  33. .inStreamingMode()
  34. .build();
  35. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
  36. tableEnv.getConfig().getConfiguration().setString(FlinSqlConstant.JOB_NAME_KEY,"PgSQLAndMySQL2MySQL");
  37. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  38. // 数据源表 用户数据
  39. String sourceUserDDL = "CREATE TABLE mysql_user_binlog ( " +
  40. " id INT NOT NULL, " +
  41. " name STRING, " +
  42. " class_no STRING, " +
  43. " primary key (id) not enforced " +
  44. ") WITH ( " +
  45. " 'connector' = 'mysql-cdc', " +
  46. " 'hostname' = '192.168.31.61', " +
  47. " 'port' = '3306', " +
  48. " 'username' = 'root', " +
  49. " 'password' = 'root', " +
  50. " 'database-name' = 'flink_sql', " +
  51. " 'table-name' = 'user' " +
  52. // " , 'scan.startup.mode' = 'latest-offset' " + // 默认全量加增量
  53. ") ";
  54. // 数据源表 班级数据 pgsql
  55. String sourceClassesDDL = "CREATE TABLE postgres_class_binlog ( " +
  56. " id INT NOT NULL, " +
  57. " class_name STRING, " +
  58. " class_no STRING, " +
  59. " primary key (id) not enforced " +
  60. ") WITH ( " +
  61. " 'connector' = 'postgres-cdc', " +
  62. " 'hostname' = '192.168.31.61', " +
  63. " 'port' = '5432', " +
  64. " 'username' = 'postgres', " +
  65. " 'password' = 'root', " +
  66. " 'database-name' = 'flink', " +
  67. " 'schema-name' = 'public', " +
  68. " 'table-name' = 'classes', " +
  69. " 'decoding.plugin.name' = 'pgoutput' " +
  70. // " 'debezium.slot.name' = '***' " +
  71. // " , 'scan.startup.mode' = 'latest-offset' " + // 默认全量加增量
  72. ") ";
  73. // 输出目标表 聚合用户跟class信息
  74. String sinkDDL =
  75. "CREATE TABLE user_class_sink ( " +
  76. " id INT, " +
  77. " name STRING, " +
  78. " class_name STRING, " +
  79. " primary key (id) not enforced " +
  80. ") WITH ( " +
  81. " 'connector' = 'jdbc', " +
  82. " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  83. " 'url' = 'jdbc:mysql://192.168.31.61:3306/flink?serverTimezone=Asia/Shanghai&useSSL=false'," +
  84. " 'username' = 'root', " +
  85. " 'password' = 'root', " +
  86. " 'table-name' = 'user' " +
  87. ")";
  88. // 简单的聚合处理
  89. String transformDmlSQL = "insert into user_class_sink select mu.id, mu.name,pc.class_name " +
  90. " from mysql_user_binlog as mu left join postgres_class_binlog as pc" +
  91. " on mu.class_no = pc.class_no";
  92. TableResult tableResult = tableEnv.executeSql(sourceUserDDL);
  93. TableResult tableResult1 = tableEnv.executeSql(sourceClassesDDL);
  94. TableResult tableResult2 = tableEnv.executeSql(sinkDDL);
  95. TableResult tableResult3 = tableEnv.executeSql(transformDmlSQL);
  96. }
  97. }