主要任务
从 Kafka 页面日志主题读取数据,统计七日回流用户和当日独立用户数。
思路分析
之前的活跃用户,一段时间未活跃(流失),今日又活跃了,就称为回流用户。此处要求统计回流用户总数。规定当日登陆,且自上次登陆之后至少7 日未登录的用户为回流用户。
1**)读取 Kafka **页面主题数据
2**)转换数据结构**
流中数据由String 转换为JSONObject。
3**)过滤数据**
统计的指标与用户有关,uid 不为null 的数据才是有用的。此外,登陆分为两种情况:(1)用户打开应用后自动登录;(2)用户打开应用后没有登陆,浏览部分页面后跳转到登录页面,中途登陆。对于情况(1),登录操作发生在会话首页,所以保留首页即可;对于情况(2),登陆操作发生在 login 页面,login 页面之后必然会跳转到其它页面,保留login 之后的页面即可记录情况(2)的登陆操作。
综上,我们应保留uid 不为null 且last_page_id 为null 或last_page_id 为 login 的浏览记录。
4**)设置水位线**
5**)按照 uid **分组
不同用户的登陆记录互不相干,各自处理。
6**)统计回流用户数和独立用户数**
运用Flink 状态编程,记录用户末次登陆日期。
(1)若状态中的末次登陆日期不为 null,进一步判断。
① 如果末次登陆日期不等于当天日期则独立用户数 uuCt 记为1,并将状态中的末次登陆日期更新为当日,进一步判断。
a)如果当天日期与末次登陆日期之差大于等于 8 天则回流用户数backCt 置为1。
b)否则 backCt 置为0。
② 若末次登陆日期为当天,则 uuCt 和backCt 均为0,此时本条数据不会影响统计结果,舍弃,不再发往下游。
(2)如果状态中的末次登陆日期为 null,将 uuCt 置为1,backCt 置为0,并将状态中的末次登陆日期更新为当日。
7**)开窗,聚合**
度量字段求和,补充窗口起始和结束时间,时间戳字段置为当前系统时间,用于ClickHouse 数据去重。
8)写入 ClickHouse
图解
ClickHouse 建表语句
启动ClickHouse
systemctl start clickhouse
systemctl start clickhouse-server
clickhouse-client -m
drop table if exists dws_user_user_login_window;
create table if not exists dws_user_user_login_window
(
stt DateTime,
edt DateTime,
back_ct UInt64,
uu_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);
代码
(1)实体类 UserLoginBean
(2)主程序
测试
开启zk,kafka,f1
依次启动 BaseLogApp,DwsUserUserLoginWindow 程序 启动脚本 (脚本需要启动两次,一次是当日日期,另一次是8天后日期,可在配置文件中修改,修改后记得分发)查看数据
当日日期数据:
8日后日期数据:
有如上图数据,则程序成功!