5.1 MySQL业务数据库准备

5.1.1 创建实时业务数据库

image.png

5.1.2 运行guigu提供的建表脚本

image.png

5.1.3 修改/etc/my.cnf文件,启动log-bin,并指定需开启的数据库

image.png

5.1.4 重启MySQL使配置生效

  1. systemctl restart mariadb

到/var/lib/mysql目录下查看初始文件大小245
image.png

5.1.5 模拟生成数据

  1. 将硅谷 /资料/数据生成脚本/业务数据 里面的jar和properties文件上传到/home/djin/work/module/rt_db目录下

image.png

  1. 修改application.properties 中数据库连接信息

image.png

  1. 运行jar包生成数据

    [root@node001 rt_db]# java -jar gmall2020-mock-db-2020-11-27.jar
    

    image.png

  2. 再次到/var/lib/mysql/下查看bin-log文件大小

image.png

5.2 FlinkCDC 项目搭建

5.2.1 FlinkCDC案例

见第七章

5.2.2 在项目中新建gmall2022-realtime模块

包及文件结构如下
image.png

目录 作用
app 产生各层数据的flink任务
bean 数据对象
common 公共常量
utils 工具类

5.2.3 修改配置文件

1)在pom.xml添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall2022</artifactId>
        <groupId>org.example</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall-realtime</artifactId>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.2</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.3.1</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
        <!--如果保存检查点到 hdfs 上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
       具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2)在resources目录下创建log4j.properties配置文件

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

5.2.4 代码实现

1)MyJsonDeserializationSchema,解析获取到MySQL bin-log日志为自定义的格式

package com.djin.gmallrealtime.common;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class MyJsonDeserializationSchema implements DebeziumDeserializationSchema {
    // 自定义数据解析器
    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(String.class);
    }

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        //获取主题信息,包含数据库和表名:mysql_binlog_source.gmall2022.user_info
        String topic = sourceRecord.topic();
        String[] arr = topic.split("\\.");
        String db = arr[1];
        String tableName = arr[2];
        //获取操作类型 READ DELETE UPDATE CREATE
        String operation = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        //获取值信息并转换为Struct类型
        Struct value = (Struct) sourceRecord.value();
        //获取转换后的数据
        Struct after = value.getStruct("after");
        //创建JSON对象用于存储数据信息
        JSONObject data = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            data.put(field.name(), o);
        }
        //创建JSON对象用于封装最终返回值数据信息
        JSONObject result = new JSONObject();
        result.put("operation", operation);
        result.put("data", data);
        result.put("database", db);
        result.put("table", tableName);

        //发送数据至下游
        collector.collect(result.toJSONString());
    }
}

2)MySQLSourceUtil,获取MySQL数据源

package com.djin.gmallrealtime.utils;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.djin.gmallrealtime.common.MyJsonDeserializationSchema;

public class MySQLSourceUtil {
    private static DebeziumSourceFunction mySqlSource = MySQLSource.<String>builder()
            .hostname("node001")
            .port(3306)
            .databaseList("gmall2022")
//                .tableList("gmall2022.user_info")  //可选配置项,如果不指定该参数,则会
            //读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
            .username("root")
            .password("123456")
            .startupOptions(StartupOptions.latest())
            .deserializer(new MyJsonDeserializationSchema())
            .build();

    public static DebeziumSourceFunction getMySQLSource() {
        return mySqlSource;
    }
}

3)KafkaUtil,将流数据推送下游的 Kafka 的 Topic 中

package com.djin.gallrealtime.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class KafkaUtil {
    private static String KAFKA_SERVER = "node001:9092,node002:9092,node003:9092";
    private static Properties properties = new Properties();
    static {
        properties.setProperty("bootstrap.servers", KAFKA_SERVER);
    }

    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
    }
}

4)RealTimeWithFlinkCDC,主程序,接收解析好的mysql bin-log并发送到Kafka

package com.djin.gmallrealtime.app;

import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.djin.gmallrealtime.utils.KafkaUtil;
import com.djin.gmallrealtime.utils.MySQLSourceUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RealTimeWithFlinkCDC {
    public static void main(String[] args) throws Exception {
        // 1 创建mySqlSource
        DebeziumSourceFunction mySQLSource = MySQLSourceUtil.getMySQLSource();
        // 2 配置执行环境,Flink-CDC将读取binlog的位置信息以状态的方式保存再CK,如果想要做断点续传,需要从Checkpoint或者Savepoint启动程序
        // 2.1 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.2 开启Checkpoint,每隔5秒钟做一次CK
        env.enableCheckpointing(5000L);
        // 2.3 指定CK的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 2.4 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.
                ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.5 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        // 2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://node001:8020/flinkCDC");
        //env.setStateBackend(new FsStateBackend("hdfs://node001:8082/flinkCDC"));过期
        // 2.7 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "djin");
        // 3 创建Flink-MySQL-CDC的Source
        DataStreamSource<String> mySQL_source = env.addSource(mySQLSource);
        mySQL_source.setParallelism(1);
        // 4 打印数据
//        mySQL_source.print().setParallelism(1);
        //4.推送数据到kafka
        mySQL_source.addSink(KafkaUtil.getKafkaSink("ods_base_db"));
        // 5 启动任务
        env.execute("MyFlinkCDC");
    }
}

5.2.5 测试验证

点击代码运行测试