主要任务
从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数和新增下单用户数,封装为实体类,写入ClickHouse。
思路分析
1**)从 Kafka订单明细主题读取数据**
2**)转换数据结构**
Kafka 订单明细主题的数据是通过Kafka-Connector 从订单预处理主题读取后进行过滤获取的,Kafka-Connector 会过滤掉主题中的null 数据,因此订单明细主题不存在为null 的数据,直接转换数据结构即可。
3**)按照 order_detail_id **分组
order_detail_id 为数据唯一键。
4**)对 order_detail_id **相同的数据去重
按照上文提到的方案对数据去重。
5**)设置水位线**
6**)按照用户id **分组
7**)计算度量字段的值**
(1)当日下单独立用户数和新增下单用户数
运用Flink 状态编程,在状态中维护用户末次下单日期。
若末次下单日期为null,则将首次下单用户数和下单独立用户数均置为 1;否则首次下单用户数置为0,判断末次下单日期是否为当日,如果不是当日则下单独立用户数置为1,否则置为 0。最后将状态中的下单日期更新为当日。
(2)其余度量字段直接取流中数据的对应值即可。
8**)开窗、聚合**
度量字段求和,补充窗口起始时间和结束时间字段,ts 字段置为当前系统时间戳。
9)写出到 ClickHouse。
图解
ClickHouse 建表语句
启动ClickHouse
systemctl start clickhouse
systemctl start clickhouse-server
clickhouse-client -m
drop table if exists dws_trade_order_window;
create table if not exists dws_trade_order_window
(
stt DateTime,
edt DateTime,
order_unique_user_count UInt64,
order_new_user_count UInt64,
order_activity_reduce_amount Decimal(38, 20),
order_coupon_reduce_amount Decimal(38, 20),
order_origin_total_amount Decimal(38, 20),
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);
代码
(1)实体类 TradeOrderBean
(2)主程序
测试
开启zk,kafka,maxwell
依次启动 DwdTradeOrderPreProcess,DwdTradeOrderDetail,DwsTradeOrderWindow 程序修改配置文件
查看数据
有如上图数据,则程序成功!