第一章.数据采集

1.实时电商数仓项目分层

  • ODS层: 原始数据: 采集日志数据和业务数据到kafka
  • DWD层: 依据数据对象为单位进行分流,比如订单,页面访问等,将事实数据采集到kafka,维度数据采集到Hbase
  • DIM层(Hbase + phoenix): 维度数据,例如user_info,spu_info
  • DWM层: 对于部分数据对象进行进一步加工,比如独立访问,跳出行为,也可以和维度进行关联.形成宽表,仍然是明细数据
  • DWS层: 根据某个主题将多个事实数据轻度聚合,形成主题宽表,数据存储到clickhouse
  • ADS层: 把Clickhouse中的数据根据可视化需要进行筛选聚合

2.用户行为数据采集(日志数据)

这里使用模拟生成数据的jar包模拟生成数据,可以将日志发送给某一个指定的端口

Flink实时数仓(一) - 图1

模拟数据文件

Flink实时数仓(一) - 图2

一.编写日志服务器

  1. 新建一个maven工程: flink-gmall
  2. 在该工程中创建一个springboot子模块(logger),用于采集日志数据

Flink实时数仓(一) - 图3

Flink实时数仓(一) - 图4

  1. 编写配置文件application.properties,用于配置端口和kafka信息
  1. server.port=8081
  2. spring.kafka.bootstrap-servers=hadoop162:9092,hadoop163:9092,hadoop164:9092
  3. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  1. 添加配置文件logback.xml,用于将日志数据落盘
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3. <!--日志的根目录, 根据需要更改成日志要保存的目录-->
  4. <property name="LOG_HOME" value="/home/atguigu/applog"/>
  5. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
  6. <encoder>
  7. <pattern>%msg%n</pattern>
  8. </encoder>
  9. </appender>
  10. <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
  11. <file>${LOG_HOME}/app.log</file>
  12. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  13. <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
  14. </rollingPolicy>
  15. <encoder>
  16. <pattern>%msg%n</pattern>
  17. </encoder>
  18. </appender>
  19. <!-- 将某一个包下日志单独打印日志 需要更换我们的 Controller 类 -->
  20. <logger name="com.atguigu.logger.controller.LoggerController"
  21. level="INFO" additivity="true">
  22. <appender-ref ref="rollingFile"/>
  23. <appender-ref ref="console"/>
  24. </logger>
  25. <root level="error" additivity="true">
  26. <appender-ref ref="console"/>
  27. </root>
  28. </configuration>
  1. 编写controller,用来处理客户端的http请求
  1. package com.atguigu.logger.controller;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RequestParam;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. @Slf4j
  10. public class LoggerController {
  11. @RequestMapping("/applog")
  12. public Object doLog(@RequestParam("param") String log){
  13. //1.把数据落盘
  14. saveToDisk(log);
  15. //2.把数据直接写入到kafka中
  16. writeToKafka(log);
  17. return "success";
  18. }
  19. @Autowired
  20. KafkaTemplate producer;
  21. private void writeToKafka(String log){
  22. producer.send("ods_log",log);
  23. }
  24. private void saveToDisk(String strLog) {
  25. log.info(strLog);
  26. }
  27. }
  1. 新建一个文件夹(shell),用于存放脚本
  2. 将项目打包上传至linux服务器上(/opt/module/gmall-flink)
  3. 分别启动hadoop,zookeeper,和kafka
  4. 启动模拟数据脚本,和kafka消费者客户端,验证是否可以写入到kafka

Flink实时数仓(一) - 图5

可以看出,模拟生成的数据已经写入到kafka的ods_log主题

  1. 将日志服务器分发至hadoop163,hadoop164并使用nginx做负载均衡
  1. http {
  2. # 启动省略
  3. upstream logcluster{
  4. server hadoop162:8081 weight=1;
  5. server hadoop163:8081 weight=1;
  6. server hadoop164:8081 weight=1;
  7. }
  8. server {
  9. listen 80;
  10. server_name localhost;
  11. #charset koi8-r;
  12. #access_log logs/host.access.log main;
  13. location / {
  14. #root html;
  15. #index index.html index.htm;
  16. # 代理的服务器集群 命名随意, 但是不能出现下划线
  17. proxy_pass http://logcluster;
  18. proxy_connect_timeout 10;
  19. }
  20. # 其他省略
  21. }
  1. 日志服务器群起脚本
  1. #!/bin/bash
  2. # 启动 nginx
  3. # 启动springboot 日志服务器
  4. nginx_home=/opt/module/nginx
  5. app_home=/opt/module/gmall-flink
  6. app=logger-0.0.1-SNAPSHOT.jar
  7. case $1 in
  8. "start")
  9. if [[ -z "`pgrep -f nginx`" ]]; then
  10. echo "在 hadoop162 启动 nginx"
  11. sudo $nginx_home/sbin/nginx
  12. else
  13. echo " nginx 已经启动无序重复启动...."
  14. fi
  15. for host in hadoop162 hadoop163 hadoop164 ; do
  16. echo "在 $host 启动日志服务器"
  17. ssh $host "nohup java -jar $app_home/$app 1>>$app_home/log.out 2>>$app_home/log.err &"
  18. done
  19. ;;
  20. "stop")
  21. echo "在 hadoop162 停止 nginx"
  22. sudo $nginx_home/sbin/nginx -s stop
  23. for host in hadoop162 hadoop163 hadoop164 ; do
  24. echo "在 $host 停止日志服务器"
  25. ssh $host "jps | awk '/$app/ {print \$1}' |xargs kill -9"
  26. done
  27. ;;
  28. *)
  29. echo "执行的姿势不对: "
  30. echo "log.sh start 启动日志采集服务器 "
  31. echo "log.sh stop 停止日志采集服务器 "
  32. ;;
  33. esac

3.业务数据采集

业务数据一般存放在mysql中,可以实时采集mysql数据的工具有canal和maxwell,debzium

本项目使用maxwell实时采集业务数据

一.mysql的binlog

  1. MySQL的二进制日志可以说是MySQL最重要的日志了,它记录了所有的DDLDML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
  2. 一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
  3. 其一:MySQL ReplicationMaster端开启binlogMster把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
  4. 其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。
  5. 二进制日志包括/两类文件:A: 二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制的文件,B:二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDLDML(除了数据查询语句)语句事件

二.开启binlog

默认情况下,mysql是没有开启binlog的,需要手动开启

  1. 编辑mysql的配置文件my.cnf(/etc/my.cnf)
  1. server-id= 1
  2. log-bin=mysql-bin
  3. binlog_format=row
  4. binlog-do-db=gmall2021
  1. 重启mysql服务器
  1. sudo systemctl restart mysqld
  1. 在mysql准备业务数据
  1. CREATE DATABASE `gmall2021` CHARACTER SET utf8 COLLATE utf8_general_ci;
  2. USE gmall2021;
  3. source /opt/software/mock/mock_db/gmall2020_12_08.sql

三.使用maxwell实时采集mysql数据

  1. 什么是maxwell
  1. maxwell 是由美国zendesk开源,用java编写的Mysql实时抓取软件。 其抓取的原理也是基于binlog
  1. maxwell与canal的对比
  1. Maxwell 没有 Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
  2. Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
  3. Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
  4. Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
  5. Maxwell比Canal更加轻量级
  1. 安装和配置maxwell
  1. # 解压
  2. tar -zxvf maxwell-1.27.1.tar.gz -C /opt/module
  1. # 配置maxwell
  2. cd /opt/module/maxwell-1.27.1
  3. vim config.properties
  1. # 添加如下配置
  2. # tl;dr config
  3. log_level=info
  4. producer=kafka
  5. kafka.bootstrap.servers=hadoop162:9092,hadoop163:9092,hadoop164:9092
  6. kafka_topic=ods_db
  7. # 按照主键的hash进行分区, 如果不设置是按照数据库分区
  8. producer_partition_by=table
  9. # mysql login info
  10. host=hadoop162
  11. user=root
  12. password=aaaaaa
  13. # 排除掉不想监控的数据库
  14. filter=exclude:gmall2021_realtime.*
  15. # 初始化维度表数据的时候使用
  16. client_id=maxwell_1
  1. 启动maxwell
  1. bin/maxwell --config=config.properties
  1. 启动kafka终端消费者,观察是否能消费到数据

Flink实时数仓(一) - 图6

可以看出,模拟生成的数据已成功的写入ods_db主题

  1. Maxwell的初始化数据功能

对于mysql中已有的旧数据,如何导入到kafka中? Canal无能为力.maxwell提供了一个初始化功能,可以满足我们的需求

  1. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table user_info --client_id maxwell_1

第二章.实时数仓环境搭建(DWD层)

上一章中,我们已经将日志数据和业务数据都采集到了相应的kafka的topic中

为了增加数据计算的复用性我们需要将ods层的数据简单处理,写回到kafka中作为dwd层

分层 数据描述 生成计算工具 存储媒介
ODS 原始数据,日志和业务数据 日志服务器,maxwell KAFKA
DWD 根据数据对象为单位进行分流,比如订单、页面访问等等。 FLINK KAFKA
DWM 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 进行了维度冗余(宽表) FLINK KAFKA
DIM 维度数据 FLINK HBASE
DWS 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 FLINK Clickhouse
ADS 把Clickhouse中的数据根据可视化需要进行筛选聚合。 Clickhouse SQL 可视化展示

1.创建gmall-realtime模块

该模块用于实时计算流式数据

  1. pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>flink-gmall</artifactId>
  7. <groupId>com.atguigu</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>gmall-realtime</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. <flink.version>1.13.1</flink.version>
  16. <scala.binary.version>2.12</scala.binary.version>
  17. <hadoop.version>3.1.3</hadoop.version>
  18. <slf4j.version>1.7.30</slf4j.version>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-java</artifactId>
  24. <version>${flink.version}</version>
  25. <scope>provided</scope>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  30. <version>${flink.version}</version>
  31. <scope>provided</scope>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  36. <version>${flink.version}</version>
  37. <scope>provided</scope>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
  42. <version>${flink.version}</version>
  43. <scope>provided</scope>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.slf4j</groupId>
  47. <artifactId>slf4j-api</artifactId>
  48. <version>${slf4j.version}</version>
  49. <scope>provided</scope>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.slf4j</groupId>
  53. <artifactId>slf4j-log4j12</artifactId>
  54. <version>${slf4j.version}</version>
  55. <scope>provided</scope>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.apache.logging.log4j</groupId>
  59. <artifactId>log4j-to-slf4j</artifactId>
  60. <version>2.14.0</version>
  61. <scope>provided</scope>
  62. </dependency>
  63. <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  64. <dependency>
  65. <groupId>org.projectlombok</groupId>
  66. <artifactId>lombok</artifactId>
  67. <version>1.18.16</version>
  68. <scope>provided</scope>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.apache.hadoop</groupId>
  72. <artifactId>hadoop-client</artifactId>
  73. <version>${hadoop.version}</version>
  74. <scope>provided</scope>
  75. </dependency>
  76. <dependency>
  77. <groupId>org.apache.flink</groupId>
  78. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  79. <version>${flink.version}</version>
  80. </dependency>
  81. <dependency>
  82. <groupId>com.alibaba</groupId>
  83. <artifactId>fastjson</artifactId>
  84. <version>1.2.75</version>
  85. </dependency>
  86. <dependency>
  87. <groupId>org.apache.flink</groupId>
  88. <artifactId>flink-cep_${scala.binary.version}</artifactId>
  89. <version>${flink.version}</version>
  90. </dependency>
  91. <dependency>
  92. <groupId>org.apache.flink</groupId>
  93. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  94. <version>${flink.version}</version>
  95. <scope>provided</scope>
  96. </dependency>
  97. <dependency>
  98. <groupId>org.apache.flink</groupId>
  99. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  100. <version>${flink.version}</version>
  101. <scope>provided</scope>
  102. </dependency>
  103. <dependency>
  104. <groupId>org.apache.flink</groupId>
  105. <artifactId>flink-csv</artifactId>
  106. <version>${flink.version}</version>
  107. </dependency>
  108. <dependency>
  109. <groupId>org.apache.flink</groupId>
  110. <artifactId>flink-json</artifactId>
  111. <version>${flink.version}</version>
  112. </dependency>
  113. <dependency>
  114. <groupId>mysql</groupId>
  115. <artifactId>mysql-connector-java</artifactId>
  116. <version>8.0.16</version>
  117. </dependency>
  118. <!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
  119. 使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作-->
  120. <dependency>
  121. <groupId>commons-beanutils</groupId>
  122. <artifactId>commons-beanutils</artifactId>
  123. <version>1.9.3</version>
  124. </dependency>
  125. <dependency>
  126. <groupId>com.ververica</groupId>
  127. <artifactId>flink-connector-mysql-cdc</artifactId>
  128. <version>2.0.1</version>
  129. </dependency>
  130. <dependency>
  131. <groupId>org.apache.phoenix</groupId>
  132. <artifactId>phoenix-core</artifactId>
  133. <version>5.0.0-HBase-2.0</version>
  134. <exclusions>
  135. <exclusion>
  136. <groupId>org.glassfish</groupId>
  137. <artifactId>javax.el</artifactId>
  138. </exclusion>
  139. </exclusions>
  140. </dependency>
  141. <dependency>
  142. <groupId>com.google.guava</groupId>
  143. <artifactId>guava</artifactId>
  144. <version>30.1-jre</version>
  145. </dependency>
  146. <dependency>
  147. <groupId>com.google.protobuf</groupId>
  148. <artifactId>protobuf-java</artifactId>
  149. <version>2.5.0</version>
  150. </dependency>
  151. </dependencies>
  152. <build>
  153. <plugins>
  154. <plugin>
  155. <groupId>org.apache.maven.plugins</groupId>
  156. <artifactId>maven-shade-plugin</artifactId>
  157. <version>3.1.1</version>
  158. <executions>
  159. <execution>
  160. <phase>package</phase>
  161. <goals>
  162. <goal>shade</goal>
  163. </goals>
  164. <configuration>
  165. <artifactSet>
  166. <excludes>
  167. <exclude>com.google.code.findbugs:jsr305</exclude>
  168. <exclude>org.slf4j:*</exclude>
  169. <exclude>log4j:*</exclude>
  170. </excludes>
  171. </artifactSet>
  172. <filters>
  173. <filter>
  174. <!-- Do not copy the signatures in the META-INF folder.
  175. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  176. <artifact>*:*</artifact>
  177. <excludes>
  178. <exclude>META-INF/*.SF</exclude>
  179. <exclude>META-INF/*.DSA</exclude>
  180. <exclude>META-INF/*.RSA</exclude>
  181. </excludes>
  182. </filter>
  183. </filters>
  184. <transformers combine.children="append">
  185. <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
  186. </transformer>
  187. </transformers>
  188. </configuration>
  189. </execution>
  190. </executions>
  191. </plugin>
  192. </plugins>
  193. </build>
  194. </project>
  1. 添加log4j.properties
  1. log4j.rootLogger=error, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
  1. 创建包结构

Flink实时数仓(一) - 图7

2.DWD层(用户行为日志)

主要任务

  1. 识别新老客户:本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)
  2. 数据拆分
  3. 不同数据写入Kafka不同的topic中
  1. 读取kafka数据工具类
  1. package com.atguigu.gmall.util;
  2. import com.atguigu.gmall.common.Constant;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import java.util.Properties;
  6. /**
  7. * 读取kafka指定topic的数据
  8. */
  9. public class FlinkSourceUtil {
  10. public static FlinkKafkaConsumer<String> getKafkaSource(String groupId,String topic){
  11. Properties properties = new Properties();
  12. properties.put("bootstrap.servers", Constant.KAFKA_BROKERS);
  13. properties.put("group.id",groupId);
  14. properties.put("auto.offset.reset","latest");//如果没有上次的消费记录,则从最新开始消费,如果有记录,则从上次的位置开始消费
  15. properties.put("isolation.level","read_committed");//设置kafka只有完成两阶段提交时才能被消费,影响时效性
  16. return new FlinkKafkaConsumer<String>(
  17. topic,
  18. new SimpleStringSchema(),
  19. properties
  20. );
  21. }
  22. }
  1. BaseApp(将消费kafka数据的代码封装为一个模板抽象类)
  1. package com.atguigu.gmall.app;
  2. import com.atguigu.gmall.util.FlinkSourceUtil;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  5. import org.apache.flink.streaming.api.CheckpointingMode;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. public abstract class BaseAppV1 {
  10. public abstract void run(StreamExecutionEnvironment env, DataStreamSource<String> stream);
  11. public void init(int port,int p,String ck,String groupId,String topic){
  12. System.setProperty("HADOOP_USER_NAME","atguigu");
  13. Configuration conf = new Configuration();
  14. conf.setInteger("rest.port",port);//设置web端端口号
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  16. env.setParallelism(p);
  17. //配置checkpoint
  18. env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
  19. env.setStateBackend(new HashMapStateBackend());
  20. env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/gmall/" + ck);
  21. env.getCheckpointConfig().setCheckpointTimeout(10 * 1000);
  22. env.getCheckpointConfig()
  23. .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  24. DataStreamSource<String> stream = env.addSource(FlinkSourceUtil.getKafkaSource(groupId, topic));
  25. //不同的APP有不同的业务逻辑
  26. run(env,stream);
  27. try {
  28. env.execute(ck);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  1. 封装常量类
  1. package com.atguigu.gmall.common;
  2. /**
  3. * 封装经常使用的常量
  4. */
  5. public class Constant {
  6. public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
  7. public static final String PHOENIX_URL = "jdbc:phoenix:hadoop162,hadoop163,hadoop164:2181";
  8. public static final String KAFKA_BROKERS = "hadoop162:9092,hadoop163:9092,hadoop164:9092";
  9. // ods层topic
  10. public static final String TOPIC_ODS_LOG = "ods_log";
  11. public static final String TOPIC_ODS_DB = "ods_db"; // shift+ctrl+u 大小写切换
  12. //dwd层 日志的 topic
  13. public static final String TOPIC_DWD_START = "dwd_start";
  14. public static final String TOPIC_DWD_DISPLAY = "dwd_display";
  15. public static final String TOPIC_DWD_PAGE = "dwd_page";
  16. public static final String DWD_SINK_KAFKA = "kafka";
  17. public static final String DWD_SINK_HBASE = "hbase";
  18. }
  1. 自定义工具类
  1. package com.atguigu.gmall.util;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. /**
  5. * 将iterable(迭代器)转换成list(集合)
  6. */
  7. public class IteratorToListUtil {
  8. public static<T> List<T> toList(Iterable<T> it){
  9. ArrayList<T> result = new ArrayList<>();
  10. for (T t : it) {
  11. result.add(t);
  12. }
  13. return result;
  14. }
  15. }

一.数据流程图

Flink实时数仓(一) - 图8

二.识别新老访客

实现思路:

  1. 考虑数据的乱序,使用event-time语义
  2. 按照mid(设备id)分组
  3. 添加5s的滚动窗口
  4. 使用状态记录首次访问的时间戳
  5. 如果状态为空, 则此窗口内的最小时间戳的事件为首次访问, 其他均为非首次访问
  6. 如果状态不为空,则此窗口内所有的事件均为非首次访问
  1. /**
  2. * 区别新老用户
  3. * @param stream
  4. */
  5. private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> stream) {
  6. //使用事件时间加窗口,找到第一条记录作为新用户,其他都是老用户
  7. return stream.map(new MapFunction<String, JSONObject>() {
  8. @Override
  9. public JSONObject map(String value) throws Exception {
  10. return JSON.parseObject(value);
  11. }
  12. })
  13. .assignTimestampsAndWatermarks(
  14. WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  15. .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
  16. @Override
  17. public long extractTimestamp(JSONObject element, long recordTimestamp) {
  18. return element.getLong("ts");
  19. }
  20. })
  21. )
  22. .keyBy(new KeySelector<JSONObject, String>() {
  23. @Override
  24. public String getKey(JSONObject value) throws Exception {
  25. return value.getJSONObject("common").getString("mid");
  26. }
  27. })
  28. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  29. .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
  30. private ValueState<Boolean> isFirstState;
  31. @Override
  32. public void open(Configuration parameters) throws Exception {
  33. isFirstState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class));
  34. }
  35. @Override
  36. public void process(String key, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception {
  37. if(isFirstState.value() == null){
  38. //这是设备id的第一个窗口,找到时间戳最小的那个,标记为新用户,其他的标记为老用户
  39. //先更新状态
  40. isFirstState.update(true);
  41. //找到时间最小的那个
  42. List<JSONObject> list = IteratorToListUtil.toList(elements);
  43. list.sort(new Comparator<JSONObject>() {
  44. @Override
  45. public int compare(JSONObject o1, JSONObject o2) {
  46. return o1.getLong("ts").compareTo(o2.getLong("ts"));
  47. }
  48. });
  49. for (int i = 0; i < list.size(); i++) {
  50. JSONObject obj = list.get(i);
  51. if(i == 0){
  52. obj.getJSONObject("common").put("is_new","1");
  53. }else{
  54. obj.getJSONObject("common").put("is_new","0");
  55. }
  56. out.collect(obj);
  57. }
  58. }else{
  59. //其他的窗口,所有记录全部标记为老用户
  60. for(JSONObject obj : elements){
  61. obj.getJSONObject("common").put("is_new","0");
  62. out.collect(obj);
  63. }
  64. }
  65. }
  66. });
  67. }

三.数据分流

根据日志数据内容,将日志数据分为三类:页面日志,启动日志和曝光日志

页面日志输出到主流,启动日志和曝光日志输出到侧输出流,并写入到对应的kafka的topic中

  1. /**
  2. * 分流: 启动日志 页面日志 曝光日志
  3. * @param stream
  4. */
  5. private HashMap<String, DataStream<JSONObject>> splitStream(SingleOutputStreamOperator<JSONObject> stream) {
  6. OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>(PAGE) {};
  7. OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>(DISPLAY) {};
  8. SingleOutputStreamOperator<JSONObject> startStream = stream.process(new ProcessFunction<JSONObject, JSONObject>() {
  9. @Override
  10. public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
  11. //主流:启动日志 侧输出流2个:页面和曝光日志
  12. if (value.containsKey("start")) {
  13. //启动日志
  14. out.collect(value);
  15. } else {
  16. //有可能是曝光和页面
  17. if (value.containsKey("page")) {
  18. //页面
  19. ctx.output(pageTag, value);
  20. }
  21. if (value.containsKey("displays")) {
  22. //曝光
  23. JSONArray displays = value.getJSONArray("displays");
  24. for (int i = 0; i < displays.size(); i++) {
  25. JSONObject obj = displays.getJSONObject(i);
  26. obj.putAll(value.getJSONObject("common"));
  27. obj.putAll(value.getJSONObject("page"));
  28. obj.put("ts", value.getLong("ts"));
  29. ctx.output(displayTag, obj);
  30. }
  31. }
  32. }
  33. }
  34. });
  35. DataStream<JSONObject> pageStream = startStream.getSideOutput(pageTag);
  36. DataStream<JSONObject> displayStream = startStream.getSideOutput(displayTag);
  37. HashMap<String, DataStream<JSONObject>> result = new HashMap<>();
  38. result.put(PAGE,pageStream);
  39. result.put(START,startStream);
  40. result.put(DISPLAY,displayStream);
  41. return result;
  42. }

四.不同流写入到kafka不同Topic中

  1. 将数据写入到kafka的工具类
  1. package com.atguigu.gmall.util;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.atguigu.gmall.common.Constant;
  4. import com.atguigu.gmall.pojo.TableProcess;
  5. import com.atguigu.gmall.sink.PhoenixSink;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  9. import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
  10. import org.apache.kafka.clients.producer.ProducerRecord;
  11. import javax.annotation.Nullable;
  12. import java.nio.charset.StandardCharsets;
  13. import java.util.Properties;
  14. /**
  15. * 向kafka指定的topic写数据
  16. */
  17. public class FlinkSinkUtil {
  18. public static SinkFunction<String> getKafkaSink(String topic){
  19. Properties properties = new Properties();
  20. properties.put("bootstrap.servers", Constant.KAFKA_BROKERS);
  21. properties.put("transaction.timeout.ms",15 * 60 * 1000);
  22. return new FlinkKafkaProducer<String>(
  23. "default",
  24. new KafkaSerializationSchema<String>() {
  25. @Override
  26. public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
  27. return new ProducerRecord<byte[], byte[]>(topic,s.getBytes(StandardCharsets.UTF_8));
  28. }
  29. },
  30. properties,
  31. FlinkKafkaProducer.Semantic.EXACTLY_ONCE
  32. );
  33. }
  34. }
  1. 具体实现方法
/**
     * 把不同的流的数据写入到不同的topic中
     * @param streams
     */
    private void writeToKafka(HashMap<String, DataStream<JSONObject>> streams) {
        streams.get(START)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_START));
        streams.get(DISPLAY)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_DISPLAY));
        streams.get(PAGE)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_PAGE));

    }

五.完整代码

package com.atguigu.gmall.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.app.BaseAppV1;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.util.FlinkSinkUtil;
import com.atguigu.gmall.util.IteratorToListUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;

public class DwdLogApp extends BaseAppV1 {
    public static final String PAGE = "page";
    public static final String DISPLAY = "display";
    public static final String START = "start";

    public static void main(String[] args) {
        new DwdLogApp().init(2001,1,"DwdLogApp","DwdLogApp", Constant.TOPIC_ODS_LOG);

    }


    /**
     * DwdlogApp具体实现
     * @param env
     * @param stream
     */
    @Override
    public void run(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        //stream.print();
        /**
         * 1.区分新老用户,对已有的字段 is_new做一个纠正
         */
        SingleOutputStreamOperator<JSONObject> validateStream = distinguishNewOrOld(stream);
        //validateStream.print();
        /**
         * 2.分流: 启动日志 页面日志 曝光日志
         */
        HashMap<String, DataStream<JSONObject>> threeStreams = splitStream(validateStream);
        //threeStreams.get(START).print(START);
        //threeStreams.get(DISPLAY).print(DISPLAY);
        //threeStreams.get(PAGE).print(PAGE);


        /**
         * 3.把不同的流的数据写入到不同的topic中
         */
        writeToKafka(threeStreams);


    }

    /**
     * 把不同的流的数据写入到不同的topic中
     * @param streams
     */
    private void writeToKafka(HashMap<String, DataStream<JSONObject>> streams) {
        streams.get(START)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_START));
        streams.get(DISPLAY)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_DISPLAY));
        streams.get(PAGE)
                .map(new MapFunction<JSONObject, String>() {
                    @Override
                    public String map(JSONObject value) throws Exception {
                        return value.toJSONString();
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_PAGE));

    }

    /**
     * 分流: 启动日志 页面日志 曝光日志
     * @param stream
     */
    private HashMap<String, DataStream<JSONObject>>  splitStream(SingleOutputStreamOperator<JSONObject> stream) {
        OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>(PAGE) {};
        OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>(DISPLAY) {};
        SingleOutputStreamOperator<JSONObject> startStream = stream.process(new ProcessFunction<JSONObject, JSONObject>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                //主流:启动日志 侧输出流2个:页面和曝光日志
                if (value.containsKey("start")) {
                    //启动日志
                    out.collect(value);
                } else {
                    //有可能是曝光和页面
                    if (value.containsKey("page")) {
                        //页面
                        ctx.output(pageTag, value);
                    }
                    if (value.containsKey("displays")) {
                        //曝光
                        JSONArray displays = value.getJSONArray("displays");
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject obj = displays.getJSONObject(i);
                            obj.putAll(value.getJSONObject("common"));
                            obj.putAll(value.getJSONObject("page"));
                            obj.put("ts", value.getLong("ts"));
                            ctx.output(displayTag, obj);
                        }
                    }
                }
            }
        });
        DataStream<JSONObject> pageStream = startStream.getSideOutput(pageTag);
        DataStream<JSONObject> displayStream = startStream.getSideOutput(displayTag);
        HashMap<String, DataStream<JSONObject>> result = new HashMap<>();
        result.put(PAGE,pageStream);
        result.put(START,startStream);
        result.put(DISPLAY,displayStream);
        return result;
    }


    /**
     * 区别新老用户
     * @param stream
     */
    private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> stream) {
        //使用事件时间加窗口,找到第一条记录作为新用户,其他都是老用户
        return stream.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String value) throws Exception {
                return JSON.parseObject(value);
            }
        })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                                    @Override
                                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                        return element.getLong("ts");
                                    }
                                })
                )
                .keyBy(new KeySelector<JSONObject, String>() {
                    @Override
                    public String getKey(JSONObject value) throws Exception {
                        return value.getJSONObject("common").getString("mid");
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                    private ValueState<Boolean> isFirstState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        isFirstState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class));
                    }

                    @Override
                    public void process(String key, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception {
                        if(isFirstState.value() == null){
                            //这是设备id的第一个窗口,找到时间戳最小的那个,标记为新用户,其他的标记为老用户
                            //先更新状态
                            isFirstState.update(true);
                            //找到时间最小的那个
                            List<JSONObject> list = IteratorToListUtil.toList(elements);
                            list.sort(new Comparator<JSONObject>() {
                                @Override
                                public int compare(JSONObject o1, JSONObject o2) {
                                    return o1.getLong("ts").compareTo(o2.getLong("ts"));

                                }
                            });
                            for (int i = 0; i < list.size(); i++) {
                                JSONObject obj = list.get(i);
                                if(i == 0){
                                    obj.getJSONObject("common").put("is_new","1");
                                }else{
                                    obj.getJSONObject("common").put("is_new","0");
                                }
                                out.collect(obj);
                            }
                        }else{
                            //其他的窗口,所有记录全部标记为老用户
                            for(JSONObject obj : elements){
                                obj.getJSONObject("common").put("is_new","0");
                                out.collect(obj);
                            }
                        }

                    }
                });
    }
}

六.测试

  1. 将项目打包上传到服务器上
  2. 启动 flink-yarn-session
/opt/module/flink-yarn » bin/yarn-session.sh -d
  1. 启动Job
/opt/module/flink-yarn » bin/flink run -c com.atguigu.gmall.app.dwd.DwdDbApp /opt/module/gmall-flink gmall-realtime-1.0-SNAPSHOT.jar
  1. 打开kafka可视化客户端,可以看到数据已经成功写入到kafka对应的topic中

Flink实时数仓(一) - 图9

3.DWD层(业务数据)

业务数据的变化,我们可以通过maxwell采集到,但是Maxwell是把全部数据统一写到一个topic中,这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理

所以需要从ods层读取数据,经过处理后将维度数据保存到Hbase,将事实数据写回到kafka作为业务数据的DWD层

其中将数据写入kafka和hbase这里采用动态分流来实现

一.数据流程图

Flink实时数仓(一) - 图10

二.设计动态配置表

  1. 创建动态配置表并初始化数据
CREATE DATABASE `gmall2021_realtime` CHARACTER SET utf8 COLLATE utf8_general_ci;
USE gmall2021_realtime;
source /opt/software/mock/mock_db/table_process_init.sql;
  1. 配置表实体类
package com.atguigu.gmall.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class TableProcess {
    private String source_table;
    private String operate_type;
    private String sink_type;
    private String sink_table;
    private String sink_columns;
    private String sink_pk;
    private String sink_extend;
}

三.实现思路

  1. 业务数据: mysql->maxwell->kafka->flink
  2. 动态配置表的数据: mysql->flink-SQL-cdc
  3. 将动态配置表做成广播流与业务数据进行connect,从而实现动态控制业务数据的sink方向

四.读取动态配置表

使用flink-SQL-cdc来实现

开源地址: https://github.com/ververica/flink-cdc-connectors

  1. 修改mysql配置,增加对gmall2021_realtime监控
vim /etc/my.cnf

[mysqld]
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2021
binlog-do-db=gmall2021_realtime
  1. 重启mysql数据库
sudo systemctl restart mysqld

注意:修改mysql配置后,maxwell会被关闭,并且启动不了,需要删除mysql中对应的maxwell数据库

五.具体实现代码

  1. 导入CDC依赖
<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.1.1</version>
</dependency>
  1. 读取ods层业务数据并ETL
/**
     * 对业务数据做etl
     * @param stream
     */
    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> stream) {
        return stream.map(line -> JSON.parseObject(line.replaceAll("bootstrap-", "")))
                .filter(obj ->
                        obj.containsKey("database")
                                && obj.containsKey("table")
                                && obj.containsKey("type")
                                && ("insert".equals(obj.getString("type")) || "update".equals(obj.getString("type")))
                                && obj.containsKey("data")
                                && obj.getString("data").length() > 10
                );
    }
  1. 读取配置表的数据,做成配置流
 /**
     * 2.读取配置表的数据,配置流
     * @param env
     * @return
     */
    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        tenv.executeSql("CREATE TABLE tp (" +
                " source_table string, " +
                " operate_type string, " +
                " sink_type string, " +
                " sink_table string, " +
                " sink_columns string, " +
                " sink_pk string, " +
                " sink_extend string, " +
                " primary key(source_table, operate_type) not enforced " + //声明主键,但是不做强检验
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'hadoop162'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = 'aaaaaa'," +
                " 'database-name' = 'gmall2021_realtime'," +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial' " +  // 程序启动的时候读取表中所有的数据, 然后再使用bin_log监控所有的变化
                ")");

        Table table = tenv.sqlQuery("select * from tp");

        return  tenv
                .toRetractStream(table, TableProcess.class)
                .filter(t -> t.f0)
                .map(t -> t.f1);

    }
  1. 将配置流进行广播得到广播流与数据流进行connect
/**
     * 3.把配置流进行广播得到广播流与数据流进行connect
     * @param dataStream
     * @param tpStream
     */
    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> dataStream, SingleOutputStreamOperator<TableProcess> tpStream) {
        //每来一条数据 JSONObject,需要找到一个对应的TablaProcess对象

        //把配置流做成广播流,然后进行连接,广播流中的元素就是map,可以设置key为userInfo:insert(source_table:operate_type)
        MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpState", String.class, TableProcess.class);
        BroadcastStream<TableProcess> tpBCStream = tpStream.broadcast(tpStateDesc);
         return dataStream
                .connect(tpBCStream)
                .process(new BroadcastProcessFunction<JSONObject, TableProcess, Tuple2<JSONObject, TableProcess>>() {
                    @Override
                    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //处理数据流的数据
                        ReadOnlyBroadcastState<String, TableProcess> tpState = ctx.getBroadcastState(tpStateDesc);
                        String key = value.getString("table") + ":" + value.getString("type");
                        TableProcess tp = tpState.get(key);
                        //有些表不需要做sink所以配置文件中是没有配置,这里就是null
                        if (tp != null) {
                            out.collect(Tuple2.of(value.getJSONObject("data"), tp));
                        }
                    }

                    @Override
                    public void processBroadcastElement(TableProcess value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //处理广播流的数据:其实就是将广播流的数据写入到广播状态中
                        BroadcastState<String, TableProcess> tpState = ctx.getBroadcastState(tpStateDesc);
                        String key = value.getSource_table() + ":" + value.getOperate_type();
                        tpState.put(key, value);
                    }
                });


    }
  1. 把数据中不需要的列过滤掉
/**
     * 4.把数据中不需要的列过滤掉
     * @param stream
     */
    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> filterColumns(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        //删除那些不要的列
        return stream.map(new MapFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public Tuple2<JSONObject, TableProcess> map(Tuple2<JSONObject, TableProcess> value) throws Exception {
                JSONObject data = value.f0;
                TableProcess tp = value.f1;
                String sink_columns = tp.getSink_columns();
                //删除掉data里map中的部分k-v
                //data.keySet().removeIf(key -> !sink_columns.contains(key));
                Set<String> keys = data.keySet();
                Iterator<String> it = keys.iterator();
                while (it.hasNext()) {
                    String key = it.next();
                    if(!sink_columns.contains(key)){
                        it.remove();
                    }
                }
                return value;
            }
        });

    }
  1. 动态分流(Kafka,Hbase)
 /**
     * 5.动态分流(kafka,Hbase)
     * 实现思路,主流到kafka,侧输出流到hbase
     * @param stream
     */
    private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") {};
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = stream.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public void processElement(Tuple2<JSONObject, TableProcess> value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                String sink_type = value.f1.getSink_type();
                if (Constant.DWD_SINK_KAFKA.equals(sink_type)) {
                    out.collect(value);
                } else if (Constant.DWD_SINK_HBASE.equals(sink_type)) {
                    ctx.output(hbaseTag, value);
                }
            }
        });
        DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag);
        return Tuple2.of(kafkaStream, hbaseStream);
    }
  1. 将动态分流后的数据写入kafka

一.更新FlinkSinkUtil

package com.atguigu.gmall.util;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.pojo.TableProcess;
import com.atguigu.gmall.sink.PhoenixSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

/**
 * 向kafka指定的topic写数据
 */

public class FlinkSinkUtil {
    public static SinkFunction<String> getKafkaSink(String topic){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Constant.KAFKA_BROKERS);
        properties.put("transaction.timeout.ms",15 * 60 * 1000);
        return new FlinkKafkaProducer<String>(
                "default",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return new ProducerRecord<byte[], byte[]>(topic,s.getBytes(StandardCharsets.UTF_8));
                    }
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
    }
    public static SinkFunction<Tuple2<JSONObject, TableProcess>> getKafkaSink(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Constant.KAFKA_BROKERS);
        properties.put("transaction.timeout.ms",15 * 60 * 1000);
        return new FlinkKafkaProducer<Tuple2<JSONObject, TableProcess>>(
                "default",
                new KafkaSerializationSchema<Tuple2<JSONObject, TableProcess>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple2<JSONObject, TableProcess> element, @Nullable Long aLong) {
                        String topic = element.f1.getSink_table();
                        String data = element.f0.toJSONString();
                        return new ProducerRecord<>(topic,data.getBytes(StandardCharsets.UTF_8));
                    }
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );



    }
    public static SinkFunction<Tuple2<JSONObject,TableProcess>> getPhoenixSink(){
        return new PhoenixSink();
    }
}

二.将动态分流后的数据写入到kafka

/**
     * 6.将动态分流后的数据写入到kafka
     * @param stream
     */
    private void writeToKafka(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        //写到同一个topic的数据,最好放在同一组,这样可以提升效率
        stream.keyBy(t -> t.f1.getSink_table())
                .addSink(FlinkSinkUtil.getKafkaSink());

    }
  1. 将动态分流后的数据写入hbase

一.导入Phoenix相关依赖

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.1-jre</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

二.PhoenixSink

将写到Phoenix的代码封装为一个工具类

package com.atguigu.gmall.sink;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.pojo.TableProcess;
import com.atguigu.gmall.util.JdbcUtil;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class PhoenixSink extends RichSinkFunction<Tuple2<JSONObject, TableProcess>> {
    private Connection conn;
    private ValueState<Boolean> isFirst;

    /**
     * 建立到Phoenix的连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        String url = Constant.PHOENIX_URL;
        conn = JdbcUtil.getPhoenixConnection(url);
        isFirst = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isFirst",Boolean.class));
    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if(conn != null){
            conn.close();
        }
    }



    @Override
    public void invoke(Tuple2<JSONObject, TableProcess> value, Context context) throws Exception {
        //1.建表
        checkTable(value);
        //2.写数据
        writeData(value);
    }

    private void writeData(Tuple2<JSONObject, TableProcess> value) throws SQLException {
        JSONObject data = value.f0;
        TableProcess tp = value.f1;
        //拼接插入语句
        //upsert into t(a,b,c) values(?,?,?)
        StringBuilder sql = new StringBuilder();
        sql.append("upsert into ")
                .append(tp.getSink_table())
                .append("(")
                .append(tp.getSink_columns())
                .append(")values(")
                .append(tp.getSink_columns().replaceAll("[^,]+", "?"))
                .append(")");
        PreparedStatement ps = conn.prepareStatement(sql.toString());
        //给占位符赋值,根据列名去data中取值
        String[] cs = tp.getSink_columns().split(",");
        for (int i = 0; i < cs.length; i++) {
            String columnName = cs[i];
            Object v = data.get(columnName);
            ps.setString(i+1,v==null?null:v.toString());
        }


        //执行SQL
        ps.execute();
        conn.commit();
        ps.close();
    }

    /**
     * 在Phoenix中进行建表
     * 执行建表语句
     * @param value
     */
    private void checkTable(Tuple2<JSONObject, TableProcess> value) throws IOException, SQLException {
        if(isFirst.value() == null){
            TableProcess tp = value.f1;
            StringBuilder sql = new StringBuilder();
            sql
                    .append("create table if not exists ")
                    .append(tp.getSink_table())
                    .append("(")
                    .append(tp.getSink_columns().replaceAll(",", " varchar, "))
                    .append(" varchar, constraint pk primary key(")
                    .append(tp.getSink_pk() == null? "id" : tp.getSink_pk())
                    .append("))")
                    .append(tp.getSink_extend() == null ? "" : tp.getSink_extend());
            System.out.println(sql.toString());
            PreparedStatement ps = conn.prepareStatement(sql.toString());
            ps.execute();
            conn.commit();
            ps.close();
            isFirst.update(true);

        }
    }
}

三.将动态分流后的数据写入到hbase

 /**
     * 7.将动态分流后的数据写入到hbase
     * @param stream
     */
    private void writeToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) {
        stream.keyBy(t -> t.f1.getSink_table())
                .addSink(FlinkSinkUtil.getPhoenixSink());
    }

六.完整代码

package com.atguigu.gmall.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.app.BaseAppV1;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.pojo.TableProcess;
import com.atguigu.gmall.util.FlinkSinkUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Iterator;
import java.util.Set;

/**
 * bin/maxwell-bootstrap --user root  --password aaaaaa --host hadoop162  --database gmall2021 --table user_info --client_id maxwell_1
 */

public class DwdDbApp extends BaseAppV1 {
    public static void main(String[] args) {
        new DwdDbApp().init(2002,1,"DwdDbApp","DwdDbApp", Constant.TOPIC_ODS_DB);

    }

    @Override
    public void run(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
         //stream.print();
        /**
         * 1.对业务数据做etl 数据流
         */
        SingleOutputStreamOperator<JSONObject> etledStream = etl(stream);
        //etledStream.print();


        /**
         * 2.读取配置表的数据,配置流
         */
        SingleOutputStreamOperator<TableProcess> tpStream = readTableProcess(env);
        //tpStream.print();

        /**
         * 3.把配置流进行广播得到广播流与数据流进行connect
         */
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStream = connectStreams(etledStream, tpStream);
        //connectedStream.print();
        /**
         * 4.把数据中不需要的列过滤掉
         */
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> filteredStream = filterColumns(connectedStream);
        //filteredStream.print();
        /**
         * 5.根据配置流的配置信息,对数据流中的数据进行动态分流(到kafka的和到hbase的)
         */
        Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(filteredStream);
        //kafkaHbaseStreams.f0.print("kafka");
        //kafkaHbaseStreams.f1.print("hbase");
        /**
         * 6.不同的流写入到不同的sink中
         */
        writeToKafka(kafkaHbaseStreams.f0);
        writeToHbase(kafkaHbaseStreams.f1);
    }

    /**
     * 7.将动态分流后的数据写入到hbase
     * @param stream
     */
    private void writeToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) {
        stream.keyBy(t -> t.f1.getSink_table())
                .addSink(FlinkSinkUtil.getPhoenixSink());
    }

    /**
     * 6.将动态分流后的数据写入到kafka
     * @param stream
     */
    private void writeToKafka(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        //写到同一个topic的数据,最好放在同一组,这样可以提升效率
        stream.keyBy(t -> t.f1.getSink_table())
                .addSink(FlinkSinkUtil.getKafkaSink());

    }

    /**
     * 5.动态分流(kafka,Hbase)
     * 实现思路,主流到kafka,侧输出流到hbase
     * @param stream
     */
    private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") {};
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = stream.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public void processElement(Tuple2<JSONObject, TableProcess> value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                String sink_type = value.f1.getSink_type();
                if (Constant.DWD_SINK_KAFKA.equals(sink_type)) {
                    out.collect(value);
                } else if (Constant.DWD_SINK_HBASE.equals(sink_type)) {
                    ctx.output(hbaseTag, value);
                }
            }
        });
        DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag);
        return Tuple2.of(kafkaStream, hbaseStream);
    }

    /**
     * 4.把数据中不需要的列过滤掉
     * @param stream
     */
    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> filterColumns(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        //删除那些不要的列
        return stream.map(new MapFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public Tuple2<JSONObject, TableProcess> map(Tuple2<JSONObject, TableProcess> value) throws Exception {
                JSONObject data = value.f0;
                TableProcess tp = value.f1;
                String sink_columns = tp.getSink_columns();
                //删除掉data里map中的部分k-v
                //data.keySet().removeIf(key -> !sink_columns.contains(key));
                Set<String> keys = data.keySet();
                Iterator<String> it = keys.iterator();
                while (it.hasNext()) {
                    String key = it.next();
                    if(!sink_columns.contains(key)){
                        it.remove();
                    }
                }
                return value;
            }
        });

    }

    /**
     * 3.把配置流进行广播得到广播流与数据流进行connect
     * @param dataStream
     * @param tpStream
     */
    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> dataStream, SingleOutputStreamOperator<TableProcess> tpStream) {
        //每来一条数据 JSONObject,需要找到一个对应的TablaProcess对象

        //把配置流做成广播流,然后进行连接,广播流中的元素就是map,可以设置key为userInfo:insert(source_table:operate_type)
        MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpState", String.class, TableProcess.class);
        BroadcastStream<TableProcess> tpBCStream = tpStream.broadcast(tpStateDesc);
         return dataStream
                .connect(tpBCStream)
                .process(new BroadcastProcessFunction<JSONObject, TableProcess, Tuple2<JSONObject, TableProcess>>() {
                    @Override
                    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //处理数据流的数据
                        ReadOnlyBroadcastState<String, TableProcess> tpState = ctx.getBroadcastState(tpStateDesc);
                        String key = value.getString("table") + ":" + value.getString("type");
                        TableProcess tp = tpState.get(key);
                        //有些表不需要做sink所以配置文件中是没有配置,这里就是null
                        if (tp != null) {
                            out.collect(Tuple2.of(value.getJSONObject("data"), tp));
                        }
                    }

                    @Override
                    public void processBroadcastElement(TableProcess value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //处理广播流的数据:其实就是将广播流的数据写入到广播状态中
                        BroadcastState<String, TableProcess> tpState = ctx.getBroadcastState(tpStateDesc);
                        String key = value.getSource_table() + ":" + value.getOperate_type();
                        tpState.put(key, value);
                    }
                });


    }


    /**
     * 2.读取配置表的数据,配置流
     * @param env
     * @return
     */
    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        tenv.executeSql("CREATE TABLE tp (" +
                " source_table string, " +
                " operate_type string, " +
                " sink_type string, " +
                " sink_table string, " +
                " sink_columns string, " +
                " sink_pk string, " +
                " sink_extend string, " +
                " primary key(source_table, operate_type) not enforced " + //声明主键,但是不做强检验
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'hadoop162'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = 'aaaaaa'," +
                " 'database-name' = 'gmall2021_realtime'," +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial' " +  // 程序启动的时候读取表中所有的数据, 然后再使用bin_log监控所有的变化
                ")");

        Table table = tenv.sqlQuery("select * from tp");

        return  tenv
                .toRetractStream(table, TableProcess.class)
                .filter(t -> t.f0)
                .map(t -> t.f1);

    }

    /**
     * 对业务数据做etl
     * @param stream
     */
    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> stream) {
        return stream.map(line -> JSON.parseObject(line.replaceAll("bootstrap-", "")))
                .filter(obj ->
                        obj.containsKey("database")
                                && obj.containsKey("table")
                                && obj.containsKey("type")
                                && ("insert".equals(obj.getString("type")) || "update".equals(obj.getString("type")))
                                && obj.containsKey("data")
                                && obj.getString("data").length() > 10
                );
    }
}

七.测试

  1. 将项目重新打包并上传到linux服务器上
  2. 编写启动app的脚本
#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
app_jar=/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT.jar


# 数组, 获取到所有正在运行的app
runnings=`$flink list 2>/dev/null | awk '/RUNNING/{ print $(NF-1)}'`
# 定义数组
apps=(
com.atguigu.gmall.app.dwd.DwdLogApp
com.atguigu.gmall.app.dwd.DwdDbApp
)

for app in ${apps[*]} ; do
    app_name=`echo $app | awk -F . '{print $NF}'`
    # app_name是否存在于runnings中
    if [[ ${runnings[@]} =~ $app_name ]]; then
        echo "$app_name 已经启动, 不需要重新启动...."
    else
        # 如果不存在
        echo "$app_name 开始启动...."
        $flink run -d -c $app $app_jar
    fi
done
  1. 启动hbase,Phoenix,hadoop,zookeeper,kafka,flink-yarn
  2. 启动脚本后发现报错

Flink实时数仓(一) - 图11

原因是项目打包会把hadoop相关的依赖也打进去,与集群产生冲突故需要在pom.xml中配置打包时不要打hadoop依赖

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <!--和hadoop相关的所有的依赖都不要打包-->
                                    <exclude>org.apache.hadoop:*</exclude>

                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  1. 重新打包并上传至服务器
  2. 重启flink-yarnsession,并执行脚本
/opt/module/flink-yarn » bin/yarn-session.sh -d
/opt/module/flink-yarn » realtime.sh
  1. 打开浏览器查看发现任务已经成功运行

Flink实时数仓(一) - 图12

  1. 模拟生成业务数据,查看数据是否已经写入到kafka和hbase

Flink实时数仓(一) - 图13

Flink实时数仓(一) - 图14

从上面两张图可以看出数据已经成功的写入到kafka和hbase中