主要任务

DWS 层是为ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。本节的任务是统计这五个指标,并将维度和度量数据写入ClickHouse 汇总表。

思路分析

任务可以分为两部分:统计指标的计算和数据写出,数据写出在上节已有介绍,不再赘述。此处仅对统计指标计算进行分析。

会话数、页面浏览数和浏览总时长三个指标均与页面浏览有关,可以由DWD 层页面浏览明细表获得。独立访客数可以由DWD 层的独立访客明细表获得,跳出会话数可以由DWD 层的用户跳出明细表获得。

三个主题读取的数据会在程序中被封装为三条流。处理后的数据要写入ClickHouse 的同一张表,那么三条流的数据结构必须完全一致,这个问题很好解决,只要定义与表结构对应的实体类,然后将流中数据结构转换为实体类即可。除此之外,还有个问题需要考虑,三条流是否需要合并?ClickHouse 表的字段将按照窗口+ 表中所有维度做order by,排序键是 ClickHouse 中的唯一键。如果三条流分别将数据写出到ClickHouse,则对于唯一键相同的数据,不考虑重复写入的情况下会存在三条需要保留的数据(度量数据分别存在于三条数据中)。我们使用了 ReplacingMergeTree,在分区合并时会按照排序键去重,排序字段相同的数据仅保留一条,将造成数据丢失。显然,这种方案是不可行的。此处将三条流合并为一条,对于每一排序键只生成一条数据。

1**)知识储备**

常见的多流合并算子及应用场景如下。

Ø union():用于两条及多条流之间的合并,对流的数量没有限制,但是要求所有流中的数据结构完全一致。

Ø connect():用于两条流的合并,其后紧邻的process 算子中可以使用的CoProcessFunction 是双流处理最底层的API,可以通过键控状态和定时器的运用实现join、广播join、段join等各种关联。connect() 只能对两条流做关联,且对两条流的数据结构没有要求。

Ø intervalJoin:段 join,两条流的每一条数据都可以与另一条流某个时间范围内的数据做关联。底层实现原理:以 A.intervalJoin(B) 为例,A 流中的数据进入算子后,会被保存到键控状态中,同时注册一个定时器,定时器触发时清空A 流状态中的数据。在定时器触发之前,B 流中的每一条数据都可以与状态中保存的A 流数据关联。同理,B 流中也维护了状态定时器。由此实现了段 join。假定A流中的定时器存在时长为3s,B流中的定时器存在时长为5s,A 流中某条数据抵达时间为tA,可与tA – 5s ~ tA + 3s 时间范围内抵达的B 流数据关联;B 流中某条数据抵达时间为tB,可与tB – 3s ~ tB + 5s 时间范围内抵达的A 流数据关联。

实时数仓(二十五)DWS层-流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 - 图1

Ø join():该算子的功能可以被其它算子替代,目前基本不用。

connect()、intervalJoin()、join() 都是双流合并算子,本节对三条流进行合并,且流中数据结构一致,选择union() 更为合理。

2**)执行步骤**

(1)读取页面主题数据,封装为流

(2)统计页面浏览时长、页面浏览数、会话数,转换数据结构

创建实体类,将独立访客数、跳出会话数置为0,将页面浏览数置为1(只要有一条页面浏览日志,则页面浏览数加一),获取日志中的页面浏览时长,赋值给实体类的同名字段,最后判断last_page_id 是否为null,如果是,说明页面是首页,开启了一个新的会话,将会话数置为 1,否则置为0。补充维度字段,窗口起始和结束时间置为空字符串。下游要根据水位线开窗,所以要补充事件时间字段,此处将日志生成时间ts 作为事件时间字段即可。最后将实体类对象发往下游。

(3)读取用户跳出明细数据

(4)转换用户跳出流数据结构

封装实体类,维度字段和时间戳处理与页面流相同,跳出数置为1,其余度量字段置为 0。将数据发往下游。

(5)读取独立访客明细数据

(6)转换独立访客流数据结构

处理过程与跳出流同理。

(7)union 合并三条流

(8)设置水位线;

(9)按照维度字段分组;

(10)开窗

  1. 跳出行为判定的超时时间为10s,假设某条日志属于跳出数据,如果它对应的事件时间为 15s,要判定是否跳出需要在水位线达到25s 时才能做到,若窗口大小为 10s,这条数据应进入10~20s 窗口,但是拿到这条数据时水位线已达到25s,所属窗口已被销毁。这样就导致跳出会话数永远为 0,显然是有问题的。要避免这种情况,必须设置窗口延迟关闭,延迟关闭时间大于等于跳出判定的超时时间才能保证跳出数据不会被漏掉。但是这样会严重影响时效性,如果企业要求延迟时间设置为半小时,那么窗口就要延迟半小时关闭。要统计跳出行为相关的指标,就必须接受它对时效性带来的负面影响。

(11)聚合计算

度量字段求和,每个窗口数据聚合完毕之后补充窗口起始时间和结束时间字段。

在ClickHouse 中,ts 将作为版本字段用于去重,ReplacingMergeTree 会在分区合并时对比排序字段相同数据的ts,保留 ts 最大的数据。此处将时间戳字段置为当前系统时间,这样可以保证数据重复计算时保留的是最后一次计算的结果。

(12)将数据写入ClickHouse。

图解

实时数仓(二十五)DWS层-流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 - 图2

ClickHouse建表语句

启动ClickHouse

systemctl start clickhouse

systemctl start clickhouse-server

clickhouse-client -m

  1. drop table if exists dws_traffic_vc_ch_ar_is_new_page_view_window;
  2. create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window
  3. (
  4. stt DateTime,
  5. edt DateTime,
  6. vc String,
  7. ch String,
  8. ar String,
  9. is_new String,
  10. uv_ct UInt64,
  11. sv_ct UInt64,
  12. pv_ct UInt64,
  13. dur_sum UInt64,
  14. uj_ct UInt64,
  15. ts UInt64
  16. ) engine = ReplacingMergeTree(ts)
  17. partition by toYYYYMMDD(stt)
  18. order by (stt, edt, vc, ch, ar, is_new);

代码编写

1)实体类TrafficPageViewBean

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/TrafficPageViewBean.java


2)主程序


https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dws/DwsTrafficVcChArIsNewPageViewWindow.java


测试

开启zk,kafka,f1

依次启动 BaseLogApp,DwdTrafficUniqueVisitorDetail ,DwdTrafficUserJumpDetail 和DwsTrafficVcChArIsNewPageViewWindow 程序 启动脚本

实时数仓(二十五)DWS层-流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 - 图3

查看数据

实时数仓(二十五)DWS层-流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 - 图4

有如上图数据,则程序成功!