主要任务
从Kafka 读取用户加购明细数据,统计每日各窗口加购独立用户数,写入ClickHouse。
思路分析
1**)从 Kafka **加购明细主题读取数据
2**)转换数据结构**
将流中数据由String 转换为JSONObject。
3**)设置水位线**
4**)按照用户 id **分组
5**)过滤独立用户加购记录**
运用Flink 状态编程,将用户末次加购日期维护到状态中。
如果末次登陆日期为null 或者不等于当天日期,则保留数据并更新状态,否则丢弃,不做操作。
6**)开窗、聚合**
统计窗口中数据条数即为加购独立用户数,补充窗口起始时间、关闭时间,将时间戳字段置为当前系统时间,发送到下游。
7)将数据写入 ClickHouse。
图解
ClickHouse 建表语句
启动ClickHouse
systemctl start clickhouse
systemctl start clickhouse-server
clickhouse-client -m
drop table if exists dws_trade_cart_add_uu_window;
create table if not exists dws_trade_cart_add_uu_window
(
stt DateTime,
edt DateTime,
cart_add_uu_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);
代码
(1)实体类 CartAddUuBean
(2)主程序
测试
开启zk,kafka,maxwell
依次启动 DwdTradeCartAdd 和 DwsTradeCartAddUuWindow 程序修改配置文件
查看数据
有如上图数据,则程序成功!