目标:
- 体验如何使用 Flink Stream API 开发一个 Flink CDC Demo,超级简单。
- 以Mysql为例,采集Mysql binlog数据。账号需要什么权限?需要注意什么?
- 生成 checkpoint 数据,重启程序从执行的状态恢复数据。
- 演示2.2版本动态加加载表的新特性,在2.1版本没有这个功能。
1.第一步创建一个Maven管理的Java Flink 项目
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.13.3
2.引入flink cdc
https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.1</version>
</dependency>
mysql -h localhost -uroot -proot123456
mysql账号需要有以下权限:
SELECT
REPLICATION SLAVE
REPLICATION CLIENT
CREATE TABLE user_1 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
create table user_2(
id int auto_increment
,`dec` decimal null
,constraint user_2_pk primary key (id)
);