主要任务
从Kafka 读取交易域支付成功主题数据,统计支付成功独立用户数和首次支付成功用户数。
思路分析
我们在DWD 层提到,订单明细表数据生成过程中会形成回撤流。left join 生成的数据集中,相同唯一键的数据可能会有多条。上文已有讲解,不再赘述。回撤数据在Kafka 中以null 值的形式存在,只需要简单判断即可过滤。我们需要考虑的是如何对其余数据去重。
对回撤流数据生成过程进行分析,可以发现,字段内容完整数据的生成一定晚于不完整数据的生成,要确保统计结果的正确性,我们应保留字段内容最全的数据,基于以上论述,内容最全的数据生成时间最晚。要想通过时间筛选这部分数据,首先要获取数据生成时间。
1**)知识储备**
FlinkSQL 提供了几个可以获取当前时间戳的函数
Ø localtimestamp:返回本地时区的当前时间戳,返回类型为TIMESTAMP(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
Ø current_timestamp:返回本地时区的当前时间戳,返回类型为TIMESTAMP_LTZ(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
Ø now():与current_timestamp 相同。
Ø current_row_timestamp():返回本地时区的当前时间戳,返回类型为TIMESTAMP_LTZ(3)。无论在流处理模式还是批处理模式下,都会对每行数据计算一次时间。
函数测试。查询语句如下。
tableEnv.sqlQuery("select localtimestamp," +
"current_timestamp," +
"now()," +
"current_row_timestamp()")
.execute()
.print();
查询结果如下。
+----+-------------------------+-------------------------+-------------------------+-------------------------+
| op | localtimestamp | current_timestamp | EXPR$2 | EXPR$3 |
+----+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529Z |
+----+-------------------------+-------------------------+-------------------------+-------------------------+
1 row in set
动态表属于流处理模式,所以四种函数任选其一即可。此处选择current_row_timestamp()。
2**)时间比较工具类**
动态表中获取的数据生成时间精确到毫秒,前文提供的日期格式化工具类无法实现此类日期字符串向时间戳的转化,也就不能通过直接转化为时间戳的方式比较两条数据的生成时间。因此,单独封装工具类用于比较TIME_STAMP(3) 类型的时间。比较逻辑是将时间拆分成两部分:小数点之前和小数点之后的。小数点之前的日期格式为yyyy-MM-dd HH:mm:ss,这部分可以直接转化为时间戳比较,如果这部分时间相同,再比较小数点后面的部分,将小数点后面的部分转换为整型比较,从而实现TIME_STAMP(3) 类型时间的比较。
3**)去重思路**
获取了数据生成时间,接下来要考虑的问题就是如何获取生成时间最晚的数据。此处提供两种思路。
(1)按照唯一键分组,开窗,在窗口闭合前比较窗口中所有数据的时间,将生成时间最晚的数据发送到下游,其它数据舍弃。
(2)按照唯一键分组,对于每一个唯一键,维护状态和定时器,当状态中数据为 null 时注册定时器,把数据维护到状态中。此后每来一条数据都比较它与状态中数据的生成时间,状态中只保留生成最晚的数据。如果两条数据生成时间相同(系统时间精度不足),则保留后进入算子的数据。因为我们的 Flink 程序并行度和Kafka 分区数相同,可以保证数据有序,后来的数据就是最新的数据。
两种方案都可行,此处选择方案二。
本节的数据来源于Kafka dwd_trade_pay_detail_suc 主题,后者的数据由payment_info、dwd_trade_order_detail、base_dic 三张表通过内连接关联获得,这一过程不会产生重复数据,因此,该表的重复数据由订单明细表决定。而dwd_trade_order_detail 表的数据来源于dwd_trade_order_pre_process,后者数据生成过程中使用了left join,因此包含 null 数据和重复数据。订单明细表读取数据使用的Kafka Connector 会过滤掉 null 数据,程序内只做了过滤没有去重,因此该表不存在 null 数据,但对于相同唯一键order_detail_id 存在重复数据。综上,支付成功明细表存在唯一键order_detail_id 相同的数据,但不存在null 数据,因此仅须去重。
4**)实现步骤**
(1)从 Kafka 支付成功明细主题读取数据
(2)转换数据结构
String 转换为JSONObject。
(3)按照唯一键分组
(4)去重
与前文同理。
(5)设置水位线,按照user_id 分组
(6)统计独立支付人数和新增支付人数
运用Flink 状态编程,在状态中维护用户末次支付日期。
若末次支付日期为null,则将首次支付用户数和支付独立用户数均置为 1;否则首次支付用户数置为 0,判断末次支付日期是否为当日,如果不是当日则支付独立用户数置为 1,否则置为 0。最后将状态中的支付日期更新为当日。
(7)开窗、聚合
度量字段求和,补充窗口起始时间和结束时间字段,ts 字段置为当前系统时间戳。
(8)写出到 ClickHouse
图解
ClickHouse 建表语句
启动ClickHouse
systemctl start clickhouse
systemctl start clickhouse-server
clickhouse-client -m
drop table if exists dws_trade_payment_suc_window;
create table if not exists dws_trade_payment_suc_window
(
stt DateTime,
edt DateTime,
payment_suc_unique_user_count UInt64,
payment_new_user_count UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);
代码
(1)实体类 TradePaymentWindowBean
(2)FlinkSQL 时间数据类型TimestampLtz3 比较工具类TimestampLtz3CompareUtil
(3)主程序
测试
开启zk,kafka,maxwell
依次启动 DwdTradeOrderPreProcess,DwdTradeOrderDetail, DwdTradePayDetailSuc 程序修改配置文件(当日)
查看数据
有如上图数据,则程序成功!