创建执行环境

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L46

27行——44行

Kafka工具类封装

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/MyKafkaUtil.java

过滤脏数据

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L46

46——65行

配置表

本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。

配置表实体类

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/TableProcess.java

Flink CDC

https://blog.csdn.net/weixin_45417821/article/details/126465210

配置表设计

1**)字段解析**

我们将为配置表设计五个字段

Ø source_table:作为数据源的业务数据表名

Ø sink_table:作为数据目的地的 Phoenix 表名

Ø sink_columns:Phoenix 表字段

Ø sink_pk:Phoenix 表主键

Ø sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句

将 source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。

2**)在Mysql中创建数据库建表并开启**Binlog

(1)创建数据库 gmall-config ,注意:和 gmall-flink 业务库区分开

  1. [root@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall-config charset utf8 default collate utf8_general_ci"

(2)在gmall-config 库中创建配置表table_process

  1. CREATE TABLE `table_process` (
  2. `source_table` varchar(200) NOT NULL COMMENT '来源表',
  3. `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
  4. `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
  5. `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
  6. `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
  7. PRIMARY KEY (`source_table`)
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(3)在MySQL配置文件中增加gmall-config 开启Binlog

vim /etc/my.cnf

启用binlog的数据库,需根据实际情况作出修改

binlog-do-db=gmall-flink

binlog-do-db=gmall-config

重启MySQL :systemctl restart mysqld

校验:cd /var/lib/mysql ,查看最后一个文件,当前大小是154,下面我们做一个测试

实时数仓(十一)DIM层代码编写 - 图1

在table_process表中添加数据再次查看

实时数仓(十一)DIM层代码编写 - 图2

实时数仓(十一)DIM层代码编写 - 图3

大小变了,说明校验成功。

配置表建表及数据导入 SQL 文件如下。

table_process初始配置.sql

FlinkCDC读取配置信息

引入依赖

  1. <!--Flink对接MySQL依赖-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <!--Flink CDC 依赖-->
  8. <dependency>
  9. <groupId>com.ververica</groupId>
  10. <artifactId>flink-connector-mysql-cdc</artifactId>
  11. <version>2.1.0</version>
  12. </dependency>
  13. <!-- 如果不引入 flink-table 相关依赖,则会报错:
  14. Caused by: java.lang.ClassNotFoundException:
  15. org.apache.flink.connector.base.source.reader.RecordEmitter
  16. 引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
  17. -->
  18. <!--Flink Table 依赖-->
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  22. <version>1.13.0</version>
  23. </dependency>

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L67

67行——79行

构建广播流与主流连接

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L81

81行——90行

连接流逻辑分析

实时数仓(十一)DIM层代码编写 - 图4

广播流和主流的底层逻辑

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/function/TableProcessFunction.java

测试

测试前的准备工作

开启hdfs,zookeeper,hbase,kafka,maxwell(如果在此期间新建了个数据库,则删除重新创建MySQL中的maxwell库),phoenix等工作

查看phoenix表:

实时数仓(十一)DIM层代码编写 - 图5

在 gmall-config 库中的 table_process 表中添加如下数据

实时数仓(十一)DIM层代码编写 - 图6

创建phoenix数据库 :create schema GMALL_FLINK;

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L67

注意:将97行注释

运行idea查看

实时数仓(十一)DIM层代码编写 - 图7

这时候已经成功了。

实时数仓(十一)DIM层代码编写 - 图8

刚刚我们在gmall-config中添加了 base_trademark 这个表,现在我们在添加一个表,名为base_category1表,查看idea运行情况(不重启任务,不修改代码,因为Flink程序就是 7* 24小时全天运行的)

实时数仓(十一)DIM层代码编写 - 图9

phoenix非常成功

实时数仓(十一)DIM层代码编写 - 图10

保存维度到HBase(Phoenix)

实时数仓(十一)DIM层代码编写 - 图11

Phoenix 工具类

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/PhoenixUtil.java

自定义JDBC连接池

官方Jdbc适合单表,只能自己定义Jdbc,加入德鲁伊依赖

  1. <!--德鲁伊依赖-->
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>druid</artifactId>
  5. <version>1.1.16</version>
  6. </dependency>

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/DruidDSUtil.java

配置常量类GmallConfig

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/DruidDSUtil.java

自定义Sink

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/function/DimSinkFunction.java

开始入库

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dim/DimApp.java#L97

生产环境测试

运行idea,之后在sqlyong中 找到 base_trademark ,添加数据

12,佟欢欢 /aaa/bbb

实时数仓(十一)DIM层代码编写 - 图12

idea 结果 :

实时数仓(十一)DIM层代码编写 - 图13

Phoenix结果

实时数仓(十一)DIM层代码编写 - 图14

成功入库,测试成功!