主要任务

从 Kafka 页面日志主题读取数据,统计七日回流用户和当日独立用户数。

思路分析

之前的活跃用户,一段时间未活跃(流失),今日又活跃了,就称为回流用户。此处要求统计回流用户总数。规定当日登陆,且自上次登陆之后至少7 日未登录的用户为回流用户。

1**)读取 Kafka **页面主题数据

2**)转换数据结构**

流中数据由String 转换为JSONObject。

3**)过滤数据**

统计的指标与用户有关,uid 不为null 的数据才是有用的。此外,登陆分为两种情况:(1)用户打开应用后自动登录;(2)用户打开应用后没有登陆,浏览部分页面后跳转到登录页面,中途登陆。对于情况(1),登录操作发生在会话首页,所以保留首页即可;对于情况(2),登陆操作发生在 login 页面,login 页面之后必然会跳转到其它页面,保留login 之后的页面即可记录情况(2)的登陆操作。

综上,我们应保留uid 不为null 且last_page_id 为null 或last_page_id 为 login 的浏览记录。

4**)设置水位线**

5**)按照 uid **分组

不同用户的登陆记录互不相干,各自处理。

6**)统计回流用户数和独立用户数**

运用Flink 状态编程,记录用户末次登陆日期。

(1)若状态中的末次登陆日期不为 null,进一步判断。

① 如果末次登陆日期不等于当天日期则独立用户数 uuCt 记为1,并将状态中的末次登陆日期更新为当日,进一步判断。

a)如果当天日期与末次登陆日期之差大于等于 8 天则回流用户数backCt 置为1。

b)否则 backCt 置为0。

② 若末次登陆日期为当天,则 uuCt 和backCt 均为0,此时本条数据不会影响统计结果,舍弃,不再发往下游。

(2)如果状态中的末次登陆日期为 null,将 uuCt 置为1,backCt 置为0,并将状态中的末次登陆日期更新为当日。

7**)开窗,聚合**

度量字段求和,补充窗口起始和结束时间,时间戳字段置为当前系统时间,用于ClickHouse 数据去重。

8)写入 ClickHouse

图解

实时数仓(二十七)DWS层-用户域用户登陆各窗口汇总表 - 图1

实时数仓(二十七)DWS层-用户域用户登陆各窗口汇总表 - 图2

ClickHouse 建表语句

启动ClickHouse

systemctl start clickhouse

systemctl start clickhouse-server

clickhouse-client -m

  1. drop table if exists dws_user_user_login_window;
  2. create table if not exists dws_user_user_login_window
  3. (
  4. stt DateTime,
  5. edt DateTime,
  6. back_ct UInt64,
  7. uu_ct UInt64,
  8. ts UInt64
  9. ) engine = ReplacingMergeTree(ts)
  10. partition by toYYYYMMDD(stt)
  11. order by (stt, edt);

代码

(1)实体类 UserLoginBean

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/UserLoginBean.java

(2)主程序

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dws/DwsUserUserLoginWindow.java

测试

开启zk,kafka,f1

依次启动 BaseLogApp,DwsUserUserLoginWindow 程序 启动脚本 (脚本需要启动两次,一次是当日日期,另一次是8天后日期,可在配置文件中修改,修改后记得分发)

实时数仓(二十七)DWS层-用户域用户登陆各窗口汇总表 - 图3

查看数据

当日日期数据:

实时数仓(二十七)DWS层-用户域用户登陆各窗口汇总表 - 图4

8日后日期数据:

实时数仓(二十七)DWS层-用户域用户登陆各窗口汇总表 - 图5

有如上图数据,则程序成功!