主要任务

从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数和新增下单用户数,封装为实体类,写入ClickHouse。

思路分析

1**)从 Kafka订单明细主题读取数据**

2**)转换数据结构**

  1. Kafka 订单明细主题的数据是通过Kafka-Connector 从订单预处理主题读取后进行过滤获取的,Kafka-Connector 会过滤掉主题中的null 数据,因此订单明细主题不存在为null 的数据,直接转换数据结构即可。

3**)按照 order_detail_id **分组

  1. order_detail_id 为数据唯一键。

4**)对 order_detail_id **相同的数据去重

  1. 按照上文提到的方案对数据去重。

5**)设置水位线**

6**)按照用户id **分组

7**)计算度量字段的值**

  1. 1)当日下单独立用户数和新增下单用户数
  2. 运用Flink 状态编程,在状态中维护用户末次下单日期。
  3. 若末次下单日期为null,则将首次下单用户数和下单独立用户数均置为 1;否则首次下单用户数置为0,判断末次下单日期是否为当日,如果不是当日则下单独立用户数置为1,否则置为 0。最后将状态中的下单日期更新为当日。

(2)其余度量字段直接取流中数据的对应值即可。

8**)开窗、聚合**

  1. 度量字段求和,补充窗口起始时间和结束时间字段,ts 字段置为当前系统时间戳。

9)写出到 ClickHouse。

图解

实时数仓(三十一)DWS层-交易域下单各窗口汇总表 - 图1

ClickHouse 建表语句

启动ClickHouse

systemctl start clickhouse

systemctl start clickhouse-server

clickhouse-client -m

  1. drop table if exists dws_trade_order_window;
  2. create table if not exists dws_trade_order_window
  3. (
  4. stt DateTime,
  5. edt DateTime,
  6. order_unique_user_count UInt64,
  7. order_new_user_count UInt64,
  8. order_activity_reduce_amount Decimal(38, 20),
  9. order_coupon_reduce_amount Decimal(38, 20),
  10. order_origin_total_amount Decimal(38, 20),
  11. ts UInt64
  12. ) engine = ReplacingMergeTree(ts)
  13. partition by toYYYYMMDD(stt)
  14. order by (stt, edt);

代码

(1)实体类 TradeOrderBean

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/TradeOrderBean.java

(2)主程序

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dws/DwsTradeOrderWindow.java

测试

开启zk,kafka,maxwell

依次启动 DwdTradeOrderPreProcess,DwdTradeOrderDetail,DwsTradeOrderWindow 程序

修改配置文件

实时数仓(三十一)DWS层-交易域下单各窗口汇总表 - 图2

启动脚本 (首次启动后,十秒钟之后再启动一次)

实时数仓(三十一)DWS层-交易域下单各窗口汇总表 - 图3

查看数据

实时数仓(三十一)DWS层-交易域下单各窗口汇总表 - 图4

有如上图数据,则程序成功!