目标:

    • 体验如何使用 Flink Stream API 开发一个 Flink CDC Demo,超级简单。
    • 以Mysql为例,采集Mysql binlog数据。账号需要什么权限?需要注意什么?
    • 生成 checkpoint 数据,重启程序从执行的状态恢复数据。
    • 演示2.2版本动态加加载表的新特性,在2.1版本没有这个功能。

    1.第一步创建一个Maven管理的Java Flink 项目

    1. mvn archetype:generate \
    2. -DarchetypeGroupId=org.apache.flink \
    3. -DarchetypeArtifactId=flink-quickstart-java \
    4. -DarchetypeVersion=1.13.3

    2.引入flink cdc
    https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc

    1. <dependency>
    2. <groupId>com.ververica</groupId>
    3. <artifactId>flink-connector-mysql-cdc</artifactId>
    4. <version>2.2.1</version>
    5. </dependency>

    截屏2022-05-12 下午10.17.09.png

    mysql -h localhost -uroot -proot123456
    mysql账号需要有以下权限:
    SELECT
    REPLICATION SLAVE
    REPLICATION CLIENT

    CREATE TABLE user_1 (
       id INTEGER NOT NULL PRIMARY KEY,
       name VARCHAR(255) NOT NULL DEFAULT 'flink',
       address VARCHAR(1024),
       phone_number VARCHAR(512),
       email VARCHAR(255)
     );
     INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
    
    create table user_2(
    id int auto_increment
    ,`dec` decimal null
    ,constraint user_2_pk primary key (id)
    );