5.1 MySQL业务数据库准备
5.1.1 创建实时业务数据库
5.1.2 运行guigu提供的建表脚本
5.1.3 修改/etc/my.cnf文件,启动log-bin,并指定需开启的数据库
5.1.4 重启MySQL使配置生效
systemctl restart mariadb
5.1.5 模拟生成数据
- 将硅谷 /资料/数据生成脚本/业务数据 里面的jar和properties文件上传到/home/djin/work/module/rt_db目录下

- 修改application.properties 中数据库连接信息

运行jar包生成数据
[root@node001 rt_db]# java -jar gmall2020-mock-db-2020-11-27.jar
再次到/var/lib/mysql/下查看bin-log文件大小
5.2 FlinkCDC 项目搭建
5.2.1 FlinkCDC案例
5.2.2 在项目中新建gmall2022-realtime模块
包及文件结构如下
| 目录 | 作用 |
|---|---|
| app | 产生各层数据的flink任务 |
| bean | 数据对象 |
| common | 公共常量 |
| utils | 工具类 |
5.2.3 修改配置文件
1)在pom.xml添加如下配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gmall2022</artifactId>
<groupId>org.example</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gmall-realtime</artifactId>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.2</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.3.1</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<!--如果保存检查点到 hdfs 上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2)在resources目录下创建log4j.properties配置文件
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
5.2.4 代码实现
1)MyJsonDeserializationSchema,解析获取到MySQL bin-log日志为自定义的格式
package com.djin.gmallrealtime.common;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
public class MyJsonDeserializationSchema implements DebeziumDeserializationSchema {
// 自定义数据解析器
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//获取主题信息,包含数据库和表名:mysql_binlog_source.gmall2022.user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
//获取操作类型 READ DELETE UPDATE CREATE
String operation = Envelope.operationFor(sourceRecord).toString().toLowerCase();
//获取值信息并转换为Struct类型
Struct value = (Struct) sourceRecord.value();
//获取转换后的数据
Struct after = value.getStruct("after");
//创建JSON对象用于存储数据信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//创建JSON对象用于封装最终返回值数据信息
JSONObject result = new JSONObject();
result.put("operation", operation);
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//发送数据至下游
collector.collect(result.toJSONString());
}
}
2)MySQLSourceUtil,获取MySQL数据源
package com.djin.gmallrealtime.utils;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.djin.gmallrealtime.common.MyJsonDeserializationSchema;
public class MySQLSourceUtil {
private static DebeziumSourceFunction mySqlSource = MySQLSource.<String>builder()
.hostname("node001")
.port(3306)
.databaseList("gmall2022")
// .tableList("gmall2022.user_info") //可选配置项,如果不指定该参数,则会
//读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.username("root")
.password("123456")
.startupOptions(StartupOptions.latest())
.deserializer(new MyJsonDeserializationSchema())
.build();
public static DebeziumSourceFunction getMySQLSource() {
return mySqlSource;
}
}
3)KafkaUtil,将流数据推送下游的 Kafka 的 Topic 中
package com.djin.gallrealtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaUtil {
private static String KAFKA_SERVER = "node001:9092,node002:9092,node003:9092";
private static Properties properties = new Properties();
static {
properties.setProperty("bootstrap.servers", KAFKA_SERVER);
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
}
}
4)RealTimeWithFlinkCDC,主程序,接收解析好的mysql bin-log并发送到Kafka
package com.djin.gmallrealtime.app;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.djin.gmallrealtime.utils.KafkaUtil;
import com.djin.gmallrealtime.utils.MySQLSourceUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RealTimeWithFlinkCDC {
public static void main(String[] args) throws Exception {
// 1 创建mySqlSource
DebeziumSourceFunction mySQLSource = MySQLSourceUtil.getMySQLSource();
// 2 配置执行环境,Flink-CDC将读取binlog的位置信息以状态的方式保存再CK,如果想要做断点续传,需要从Checkpoint或者Savepoint启动程序
// 2.1 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.2 开启Checkpoint,每隔5秒钟做一次CK
env.enableCheckpointing(5000L);
// 2.3 指定CK的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2.4 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.5 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
// 2.6 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://node001:8020/flinkCDC");
//env.setStateBackend(new FsStateBackend("hdfs://node001:8082/flinkCDC"));过期
// 2.7 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "djin");
// 3 创建Flink-MySQL-CDC的Source
DataStreamSource<String> mySQL_source = env.addSource(mySQLSource);
mySQL_source.setParallelism(1);
// 4 打印数据
// mySQL_source.print().setParallelism(1);
//4.推送数据到kafka
mySQL_source.addSink(KafkaUtil.getKafkaSink("ods_base_db"));
// 5 启动任务
env.execute("MyFlinkCDC");
}
}
5.2.5 测试验证
点击代码运行测试
