DWS层设计要点:

(1)DWS层的设计参考指标体系;

(2)DWS层表名的命名规范为dws数据域统计粒度业务过程统计周期(window)

  1. 注:window 表示窗口对应的时间范围。

开窗之后,连接操作更快,数据量少,数据库存储的数据量降低

主要任务

从Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,(查看用户搜索的频次哪个最高)写入 ClickHouse。

思路分析

本程序将使用FlinkSQL 实现。分词是个一进多出的过程,需要一个UDTF 函数来实现,FlinkSQL 没有提供相关的内置函数,所以要自定义UDTF 函数。

自定义函数的逻辑在代码中实现,要完成分词功能,需要导入相关依赖,此处将借助IK 分词器完成分词。

最终要将数据写入ClickHouse,需要补充相关依赖,封装 ClickHouse 工具类和方法。本节任务分为两部分:分词处理和数据写出。

不用ES的原因:

那分词大家想到了什么呢?提到分词大家应该直接想到了es,没毛病。确实就是分词呢,我们只在es去学过。 那我要问大家那是不是这个需求,咱们用es能存吗?我告诉你用es的不行,那你想一下es分词干嘛呀? 它分词以后建索引。对吧。是他把这个词分了,但是我们现在分词要干嘛呀?我们现在分词要作词频统计,就是这个词我也拿到,然后呢,完全统计它有多少个。对吧? 你拿es来存储有什么用?他给你分词了是它给你动词,然后呢,你能拿到这个分词吗?拿不到。 对吧?所以呢,我们不能用es存储。也就是说这个分词呢。咱们得自己写。假如苹果手机对吧,看这个关键词。我把它拆分成苹果和手机两个词。

1)分词处理

分词处理分为八个步骤,如下。

(1)创建分词工具类

  1. 定义分词方法,借助IK 分词器提供的工具将输入的关键词拆分成多个词,返回一个

List 集合。

(2)创建自定义函数类

继承Flink 的TableFunction 类,调用分词工具类的分词方法,实现分词逻辑。

(3)注册函数

(4)从 Kafka 页面浏览明细主题读取数据并设置水位线

(5)过滤搜索行为

满足以下三个条件的即为搜索行为数据:

① page 字段下item 字段不为null;

② page 字段下last_page_id 为search;

③ page 字段下item_type 为keyword。

(6)分词

(7)分组、开窗、聚合计算

按照拆分后的关键词分组。统计每个词的出现频次,补充窗口起始时间、结束时间和关键词来源(source)字段。调用unix_timestamp() 函数获取以秒为单位的当前系统时间戳,转为毫秒(*1000),作为ClickHouse 表的版本字段,用于数据去重。

(8)将动态表转换为流

2**)将数据写入 **ClickHouse

(1)建表

要将数据写入ClickHouse,先要建表。首先要明确使用的表引擎。为了保证数据不重复,可以使用ReplacingMergeTree(替换合并树) 或者ReplicatedMergeTree(副本合并树),二者均可去重,区别如下。

① 副本通过对比插入的“数据块”(同一批次写入的数据)实现去重,如果插入的两批数据相似度达到 ClickHouse 的判断标准后插入的数据会被舍弃。副本的初衷是防止数据丢失,而非去重,如果重复数据夹杂在不同的数据块中并不能实现去重效果。假设向ClickHouse 写入数据时 5 条一批,第一批次 ABCDE 第二批 FAGHI,只要没有达到 ClickHouse 对数据块重复的判断标准,重复的A 依然会被写入。

② ReplacingMergeTree 在建表时需要定义版本字段,它会对比排序字段(在 ClickHouse 中排序字段可以唯一标识一行数据)相同数据的版本字段,如果设置了该字段,且多条数据的该字段值不同,则保留版本字段值最大的数据,如果没有设置该字段或者多条数据该字段的值相同,则按插入顺序保留最后一条。数据的去重只会在数据合并期间进行。合并操作会在后台一个不确定的时间执行,无法预先做出计划。因此无法保证每时每刻数据不会重复。可以执行optimize table xxx final 手动对分区进行合并。

此处选择ReplacingMergeTree,主要考虑到虽然去重有延迟,但在必要时可以通过optimize 去重。但这个命令会引发大量读写操作,对ClickHouse 而言是非常重的,极其影响性能。生产环境不可能在每次查询前都做一次合并操作,不可过多依赖optimize 去重。

(2)写出方式

调用Flink提供的JDBCSink.sink(String sql,JdbcStatementBuilder statementBuilder, JdbcExecutionOptions executionOptions, JdbcConnectionOptions connectionOptions) 方法创建JDBC sink,返回 SinkFunction 类型的对象,将其作为流调用addSink() 方法的参数,即可将数据以JDBC 方式写入数据库。这种方式只能写入数据库中的一张表。参数解读如下

Ø sql:任意的 DML 语句。

Ø statementBuilder:构造者类JDBCStatementBuilder 对象,用于为数据库操作对象(PreparedStatement 对象)中的占位符传参。核心方法accept(PreparedStatement preparedStatement, T obj),参数解读如下。

Ø preparedStatement:数据库操作对象。

Ø obj:流中数据对象。要给占位符传参,就必须将SQL 中的占位符和流中数据对应起来。然而,不同SQL 语句的占位符数量可能不同,不可能设置一个统一的数值指定占位符个数,然后简单地通过固定次数的循环完成传参。那么,如何在程序中将占位符和流中数据对应起来?可以这样做,用传入方法的流中数据对象(obj)获取类的 Class 对象,然后通过反射的方式获取所有属性的Field 对象,再调用field 对象的setObject() 方法将流中数据传递给SQL 中的占位符,完成传参。

Ø T:泛型,指定流中数据类型。

Ø executionOptions:SQL DML 语句是按照批次执行的,该参数用于设置执行参数,API 如下。

Ø withBatchIntervalMs(long intervalMs) 设置批处理时间间隔,单位毫秒。默认值为0,表示不会基于时间对批处理进行控制。

Ø withBatchSize(int size) 设置批次大小(数据的条数),默认为5000 条。

Ø withMaxRetries(int maxRetries) 设置最大重试次数,默认为3 次。

Ø 批处理触发条件(满足其一即可):

Ø 距离上次数据插入经过了withBatchIntervalMs 设置的时间间隔

Ø 数据量达到批大小

Ø Flink 检查点启动时

Ø connectionOptions:设置数据库连接参数

Ø withUrl:数据库 URL

Ø withDriverName:数据库驱动名称

Ø withUsername:连接数据库的用户名

Ø withPassword:连接数据库的密码

(3)TransientSink

  1. 在实体类中某些字段是为了辅助指标计算而设置的,并不会写入到数据库。那么,如何告诉程序哪些字段不需要写入数据库呢?Java 的反射提供了解决思路。类的属性对象Field 可以调用getAnnotation(Class annotationClass) 方法获取写在类中属性定义语句上方的注解中的信息,若注解存在则返回值不为null
  2. 定义一个可以写在属性上的注解,对于不需要写入数据库的属性,在实体类中属性定义语句上方添加该注解。为数据库操作对象传参时判断注解是否存在,是则跳过属性,即可实现对属性的排除。

图解

实时数仓(二十四)DWS层-流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL) - 图1

ClickHouse 建表语句

启动ClickHouse

systemctl start clickhouse

systemctl start clickhouse-server

clickhouse-client -m

  1. create database gmall_rebuild;
  2. drop table if exists dws_traffic_source_keyword_page_view_window;
  3. create table if not exists dws_traffic_source_keyword_page_view_window
  4. (
  5. `stt` DateTime,
  6. `edt` DateTime,
  7. `source` String,
  8. `keyword` String,
  9. `keyword_count` UInt64,
  10. `ts` UInt64
  11. ) engine = ReplacingMergeTree(ts)
  12. partition by toYYYYMMDD(stt)
  13. order by (stt, edt, source, keyword);

代码编写

(1)IK 分词相关依赖

  1. <dependency>
  2. <groupId>com.janeluo</groupId>
  3. <artifactId>ikanalyzer</artifactId>
  4. <version>2012_u6</version>
  5. </dependency>

(2)IK 分词工具类KeywordUtil

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/KeywordUtil.java

(3)FlinkSQL 用户自定义函数 KeywordUDTF

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/function/SplitFunction.java

(4)实体类KeywordBean

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/KeywordBean.java

(5)补充ClickHouse 相关依赖

  1. <dependency>
  2. <groupId>ru.yandex.clickhouse</groupId>
  3. <artifactId>clickhouse-jdbc</artifactId>
  4. <version>0.3.0</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>com.fasterxml.jackson.core</groupId>
  8. <artifactId>jackson-databind</artifactId>
  9. </exclusion>
  10. <exclusion>
  11. <groupId>com.fasterxml.jackson.core</groupId>
  12. <artifactId>jackson-core</artifactId>
  13. </exclusion>
  14. </exclusions>
  15. </dependency>

(6)在GmallConfig 常量类中补充常量

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/common/GmallConfig.java

(7)ClickHouse 工具类

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/MyClickHouseUtil.java

(8)TransientSink 注解

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/TransientSink.java

(9)主程序

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dws/DwsTrafficSourceKeywordPageViewWindow.java

测试

启动Zookeeper,Kafka,Flume (f1)

修改application.yml 日期 (改成当日)

实时数仓(二十四)DWS层-流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL) - 图2

修改之后分发

启动 BaseLogApp 和 DwsTrafficSourceKeywordPageViewWindow 类

启动脚本

实时数仓(二十四)DWS层-流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL) - 图3

查看数据

实时数仓(二十四)DWS层-流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL) - 图4

有如上图数据,则程序成功!