主要任务

经过分析,订单明细表和取消订单明细表的数据来源、表结构都相同,差别只在业务过程和过滤条件,为了减少重复计算,将两张表公共的关联过程提取出来,形成订单预处理表。

关联订单明细表订单表订单明细活动关联表订单明细优惠券关联表四张事实业务表和字典表(维度业务表)形成订单预处理表,写入Kafka 对应主题。

本节形成的预处理表中要保留订单表的type 和old 字段,用于过滤订单明细数据和取消订单明细数据。

思路分析

1**)知识储备**

(1)left join 实现过程

假设A 表作为主表与B 表做等值左外联。当 A 表数据进入算子,而B 表数据未至时会先生成一条B 表字段均为null 的关联数据ab1,其标记为+I。其后,B 表数据到来,会先将之前的数据撤回,即生成一条与ab1 内容相同,但标记为-D 的数据,再生成一条关联后的数据,标记为+I。这样生成的动态表对应的流称之为回撤流。

(2)Kafka SQL Connector

Kafka SQL Connector 分为Kafka SQL Connector 和 Upsert Kafka SQL Connector

① 功能

Upsert Kafka Connector支持以 upsert 方式从 Kafka topic 中读写数据

Kafka Connector支持从 Kafka topic 中读写数据

② 区别

a)建表语句的主键

i)Kafka Connector 要求表不能有主键,如果设置了主键,报错信息如下

  1. Caused by: org.apache.flink.table.api.ValidationException: The Kafka table 'default_catalog.default_database.normal_sink_topic' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.

ii)而 Upsert Kafka Connector 要求表必须有主键,如果没有设置主键,报错信息如下

  1. Caused by: org.apache.flink.table.api.ValidationException: 'upsert-kafka' tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.

iii)语法: primary key(id) not enforced

注意:not enforced 表示不对来往数据做约束校验,Flink 并不是数据的主人,因此只支持 not enforced 模式

如果没有not enforced,报错信息如下

  1. Exception in thread "main" org.apache.flink.table.api.ValidationException: Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode

b)对表中数据操作类型的要求

i)Kafka Connector 不能消费带有 Upsert/Delete 操作类型数据的表,如 left join 生成的动态表。如果对这类表进行消费,报错信息如下

  1. Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.normal_sink_topic' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_9]], fields=[l_id, tag_left, tag_right])

ii)Upsert Kafka Connector 将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,因此同一主键的更新/删除消息将落在同一分区,从而保证同一主键的消息有序。

③ left join 结合Upsert Kafka Connector 使用范例

说明:Kafka 并行度为4

a)表结构

  1. left
  2. id tag
  3. A left
  4. B left
  5. C left
  6. right
  7. id tag
  8. A right
  9. B right
  10. C right

b)查询语句

  1. select
  2. l.id l_id,
  3. l.tag l_tag,
  4. r.tag r_tag
  5. from left l
  6. left join
  7. right r
  8. on l.id = r.id

c)关联结果写入到 Upsert Kafka 表,消费 Kafka 对应主题数据结果展示

  1. {"l_id":"A","tag_left":"left","tag_right":null}
  2. null
  3. {"l_id":"A","tag_left":"left","tag_right":"right"}
  4. {"l_id":"C","tag_left":"left","tag_right":null}
  5. null
  6. {"l_id":"C","tag_left":"left","tag_right":"right"}
  7. {"l_id":"B","tag_left":"left","tag_right":null}
  8. null
  9. {"l_id":"B","tag_left":"left","tag_right":"right"}

④ 参数解读

本节需要用到Kafka 连接器的明细表数据来源于topic_db 主题,于Kafka 而言,该主题的数据的操作类型均为INSERT,所以读取数据使用 Kafka Connector 即可。而由于left join 的存在,流中存在修改数据,所以写出数据使用Upsert Kafka Connector。

Upsert Kafka Connector 参数

Ø connector:指定使用的连接器,对于 Upsert Kafka,使用 ‘upsert-kafka’

Ø topic:主题

Ø properties.bootstrap.servers:以逗号分隔的 Kafka broker 列表

Ø key.format:key 的序列化和反序列化格式

Ø value.format:value 的序列化和反序列化格式

2**)执行步骤**

预处理表与订单明细事务事实表的区别只在于前者不会对订单数据进行筛选,且在表中增加了type 和old 字段。二者的粒度、聚合逻辑都相同,因此按照订单明细表的思路对预处理表进行分析即可。

(1)设置 ttl;

订单明细表、订单表、订单明细优惠券管理表和订单明细活动关联表不存在业务上的滞后问题,只考虑可能的数据乱序即可,此处将ttl 设置为5s。

  1. 要注意:前文提到,本项目保证了同一分区、同一并行度的数据有序。此处的乱序与之并不冲突,以下单业务过程为例,用户完成下单操作时,订单表中会插入一条数据,订单明细表中会插入与之对应的多条数据,本项目业务数据是按照主键分区进入Kafka 的,虽然同分区数据有序,但是同一张业务表的数据可能进入多个分区,会乱序。这样一来,订单表数据与对应的订单明细数据可能被属于其它订单的数据“插队”,因而导致主表或从表数据迟到,可能 join 不上,为了应对这种情况,设置乱序程度,让状态中的数据等待一段时间。

(2)从Kafka topic_db 主题读取业务数据;

这一步要调用PROCTIME() 函数获取系统时间作为与字典表做Lookup Join 的处理时间字段。

(3)筛选订单明细表数据;

应尽可能保证事实表的粒度为最细粒度,在下单业务过程中,最细粒度的事件为一个订单的一个SKU 的下单操作,订单明细表的粒度与最细粒度相同,将其作为主表

(4)筛选订单表数据;

通过该表获取user_id 和province_id。保留 type 字段和old 字段用于筛选订单明细数据和取消订单明细数据。

(5)筛选订单明细活动关联表数据;

通过该表获取活动id 和活动规则id。

(6)筛选订单明细优惠券关联表数据;

通过该表获取优惠券id。

(7)建立MySQL-Lookup 字典表;

通过字典表获取订单来源类型名称。

(8)关联上述五张表获得订单宽表,写入Kafka 主题

事实表的粒度应为最细粒度,在下单和取消订单业务过程中,最细粒度为一个sku 的下单或取消订单操作,与订单明细表粒度相同,将其作为主表。

① 订单明细表和订单表的所有记录在另一张表中都有对应数据,内连接即可。

② 订单明细数据未必参加了活动也未必使用了优惠券,因此要保留订单明细独有数据,所以与订单明细活动关联表和订单明细优惠券关联表的关联使用 left join。

③ 与字典表的关联是为了获取 source_type 对应的source_type_name,订单明细数据在字典表中一定有对应,内连接即可。

图解

实时数仓(十六)DWD层-交易域订单预处理表 - 图1

代码编写

1)在KafkaUtil 中补充getUpsertKafkaDDL 方法

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/util/MyKafkaUtil.java


2)主程序

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dwd/db/DwdTradeOrderPreProcess.java

测试

创建 dwd_trade_order_pre_process 主题

  1. bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic dwd_trade_order_pre_process

开启dwd_trade_order_pre_process消费者

  1. bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_trade_order_pre_process

运行DwdTradeOrderPreProcess

设置一下配置文件,将重置设置为1

实时数仓(十六)DWD层-交易域订单预处理表 - 图2

开始启动

实时数仓(十六)DWD层-交易域订单预处理表 - 图3

观察消费者数据

实时数仓(十六)DWD层-交易域订单预处理表 - 图4

有如上图数据,则测试成功