主要任务

从Kafka 读取用户加购明细数据,统计每日各窗口加购独立用户数,写入ClickHouse。

思路分析

1**)从 Kafka **加购明细主题读取数据

2**)转换数据结构**

将流中数据由String 转换为JSONObject。

3**)设置水位线**

4**)按照用户 id **分组

5**)过滤独立用户加购记录**

运用Flink 状态编程,将用户末次加购日期维护到状态中。

  1. 如果末次登陆日期为null 或者不等于当天日期,则保留数据并更新状态,否则丢弃,不做操作。

6**)开窗、聚合**

统计窗口中数据条数即为加购独立用户数,补充窗口起始时间、关闭时间,将时间戳字段置为当前系统时间,发送到下游。

7)将数据写入 ClickHouse。


图解

实时数仓(二十九)DWS层-交易域加购各窗口汇总表 - 图1

ClickHouse 建表语句

启动ClickHouse

systemctl start clickhouse

systemctl start clickhouse-server

clickhouse-client -m

  1. drop table if exists dws_trade_cart_add_uu_window;
  2. create table if not exists dws_trade_cart_add_uu_window
  3. (
  4. stt DateTime,
  5. edt DateTime,
  6. cart_add_uu_ct UInt64,
  7. ts UInt64
  8. ) engine = ReplacingMergeTree(ts)
  9. partition by toYYYYMMDD(stt)
  10. order by (stt, edt);

代码

(1)实体类 CartAddUuBean

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/bean/CartAddUuBean.java

(2)主程序

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dws/DwsTradeCartAddUuWindow.java

测试

开启zk,kafka,maxwell

依次启动 DwdTradeCartAdd 和 DwsTradeCartAddUuWindow 程序

修改配置文件

实时数仓(二十九)DWS层-交易域加购各窗口汇总表 - 图2

启动脚本 (首次启动后,十秒钟之后再启动一次)

实时数仓(二十九)DWS层-交易域加购各窗口汇总表 - 图3

查看数据

实时数仓(二十九)DWS层-交易域加购各窗口汇总表 - 图4

有如上图数据,则程序成功!