left join pgsql和mysql中的两张表生成新数据后放入mysql中
- flink cdc pgsql
问题注意
Parallelism
- 只能设置为 1 , 要不然数据同步会出问题
env.setParallelism_(_1_)_;
SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
SET 'pipeline.max-parallelism' = '1'; -- optional: Flink's maximum parallelism
submit job 时报错,但是任务提交却成功了
```java // sunmit job err tableEnv.executeSql(sourceUserDDL); tableEnv.executeSql(sourceClassesDDL); tableEnv.executeSql(sinkDDL); tableEnv.executeSql(transformDmlSQL);
// submit job success (真他妈神奇) TableResult tableResult = tableEnv.executeSql(sourceUserDDL); TableResult tableResult1 = tableEnv.executeSql(sourceClassesDDL); TableResult tableResult2 = tableEnv.executeSql(sinkDDL); TableResult tableResult3 = tableEnv.executeSql(transformDmlSQL);
<a name="lsOYO"></a>
# 建表语句
```sql
# 源数据用户表 <MySQL>
CREATE TABLE `user` (
`name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`id` int NOT NULL AUTO_INCREMENT,
`class_no` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
# 源数据班级表 <postgres SQL>
CREATE TABLE "public"."classes" (
"id" int4 NOT NULL DEFAULT nextval('classes_id_seq'::regclass),
"class_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"class_no" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
CONSTRAINT "pk_port_forward" PRIMARY KEY ("id")
);
ALTER TABLE "public"."classes"
OWNER TO "postgres";
## sink表<mysql>
CREATE TABLE `user` (
`id` int NOT NULL,
`name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`class_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Flink SQL准备
CREATE TABLE mysql_user_binlog (
id INT NOT NULL,
name STRING,
class_no STRING,
primary key (id) not enforced
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.31.61',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'flink_sql',
'table-name' = 'user'
);
CREATE TABLE postgres_class_binlog (
id INT NOT NULL,
class_name STRING,
class_no STRING,
primary key (id) not enforced
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.31.61',
'port' = '5432',
'username' = 'postgres',
'password' = 'root',
'database-name' = 'flink',
'schema-name' = 'public',
'table-name' = 'classes',
'decoding.plugin.name' = 'pgoutput'
);
CREATE TABLE user_class_sink (
id INT,
name STRING,
class_name STRING,
primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.31.61:3306/flink?serverTimezone=Asia/Shanghai&useSSL=false',
'username' = 'root',
'password' = 'root',
'table-name' = 'user'
);
SET 'pipeline.name' = 'PgSQLAndMySQL2MySQL';
SET 'parallelism.default' = '1';
SET 'pipeline.max-parallelism' = '1';
# sink
insert into user_class_sink select mu.id, mu.name,pc.class_name from mysql_user_binlog as mu left join postgres_class_binlog as pc on mu.class_no = pc.class_no;
Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!--flink cdc connector -->
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<!-- flink cdc pgsql -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
JAVA代码
package cn.tannn;
import cn.tannn.constant.FlinSqlConstant;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* post
*/
public class PgSQLAndMySQL2MySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setParallelism(1);
EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
tableEnv.getConfig().getConfiguration().setString(FlinSqlConstant.JOB_NAME_KEY,"PgSQLAndMySQL2MySQL");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 数据源表 用户数据
String sourceUserDDL = "CREATE TABLE mysql_user_binlog ( " +
" id INT NOT NULL, " +
" name STRING, " +
" class_no STRING, " +
" primary key (id) not enforced " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.31.61', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'flink_sql', " +
" 'table-name' = 'user' " +
// " , 'scan.startup.mode' = 'latest-offset' " + // 默认全量加增量
") ";
// 数据源表 班级数据 pgsql
String sourceClassesDDL = "CREATE TABLE postgres_class_binlog ( " +
" id INT NOT NULL, " +
" class_name STRING, " +
" class_no STRING, " +
" primary key (id) not enforced " +
") WITH ( " +
" 'connector' = 'postgres-cdc', " +
" 'hostname' = '192.168.31.61', " +
" 'port' = '5432', " +
" 'username' = 'postgres', " +
" 'password' = 'root', " +
" 'database-name' = 'flink', " +
" 'schema-name' = 'public', " +
" 'table-name' = 'classes', " +
" 'decoding.plugin.name' = 'pgoutput' " +
// " 'debezium.slot.name' = '***' " +
// " , 'scan.startup.mode' = 'latest-offset' " + // 默认全量加增量
") ";
// 输出目标表 聚合用户跟class信息
String sinkDDL =
"CREATE TABLE user_class_sink ( " +
" id INT, " +
" name STRING, " +
" class_name STRING, " +
" primary key (id) not enforced " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://192.168.31.61:3306/flink?serverTimezone=Asia/Shanghai&useSSL=false'," +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'table-name' = 'user' " +
")";
// 简单的聚合处理
String transformDmlSQL = "insert into user_class_sink select mu.id, mu.name,pc.class_name " +
" from mysql_user_binlog as mu left join postgres_class_binlog as pc" +
" on mu.class_no = pc.class_no";
TableResult tableResult = tableEnv.executeSql(sourceUserDDL);
TableResult tableResult1 = tableEnv.executeSql(sourceClassesDDL);
TableResult tableResult2 = tableEnv.executeSql(sinkDDL);
TableResult tableResult3 = tableEnv.executeSql(transformDmlSQL);
}
}