也叫独立访客明细表,做一个UV,为了未来的UV需求做准备。

主要任务

过滤页面数据中的独立访客访问记录。

思路分析

1**)过滤 last_page_id 不为null 的数据**

独立访客数据对应的页面必然是会话起始页面,last_page_id 必为null。过滤 last_page_id != null 的数据,减小数据量,提升计算效率。

2**)筛选独立访客记录**

运用Flink 状态编程,为每个mid 维护一个键控状态,记录末次登录日期。 如果末次登录日期为null 或者不是今日,则本次访问是该mid 当日首次访问,保留数据,将末次登录日期更新为当日。否则不是当日首次访问,丢弃数据。

3**)状态存活时间设置**

如果保留状态,第二日同一mid 再次访问时会被判定为新访客,如果清空状态,判定结果相同,所以只要时钟进入第二日状态就可以清空。 设置状态的TTL 1 天,更新模式为OnCreateAndWrite,表示在创建和更新状态时重置状态存活时间。如:2022-02-21 08:00:00 首次访问,若2022-02-22 没有访问记录,则2022-02-22 08:00:00 之后状态清空。

图解

实时数仓(十三)DWD层-流量域独立访客事务事实表 - 图1

代码测试

代码展示:

https://gitee.com/luan_hao/gmall-flink/blob/master/gmall-realtime/src/main/java/com/apache/gmall/app/dwd/log/DwdTrafficUniqueVisitorDetail.java

创建Kafka dwd_traffic_unique_visitor_detail 主题

  1. bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic dwd_traffic_unique_visitor_detail

启动生产者(dwd_traffic_page_log),消费者(dwd_traffic_unique_visitor_detail)

  1. bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic dwd_traffic_page_log
  2. bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_unique_visitor_detail

测试数据 (同一个数据):

  1. #有last_page_id(不会输出,被过滤掉)
  2. {"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_143740","os":"iOS 13.3.1","uid":"56","vc":"v2.1.134"},"page":{"during_time":11275,"item":"1,12,29","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1655175591000}
  3. #无last_id数据(输出)
  4. {"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_143740","os":"iOS 13.3.1","uid":"56","vc":"v2.1.134"},"page":{"during_time":11275,"item":"1,12,29","item_type":"sku_ids","page_id":"trade"},"ts":1655175591000}
  5. #接上一个数据,完全没改(不会输出)
  6. {"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_143740","os":"iOS 13.3.1","uid":"56","vc":"v2.1.134"},"page":{"during_time":11275,"item":"1,12,29","item_type":"sku_ids","page_id":"trade"},"ts":1655175591000}
  7. #修改mid(输出)
  8. {"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_143741","os":"iOS 13.3.1","uid":"56","vc":"v2.1.134"},"page":{"during_time":11275,"item":"1,12,29","item_type":"sku_ids","page_id":"trade"},"ts":1655175591000}
  9. #修改时间(输出)
  10. {"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_143741","os":"iOS 13.3.1","uid":"56","vc":"v2.1.134"},"page":{"during_time":11275,"item":"1,12,29","item_type":"sku_ids","page_id":"trade"},"ts":1755175591000}

正式上线

启动BaseLogApp,DwdTrafficUniqueVisitorDetail,开启zookeeper,f1,Kafka,

启动生产者(dwd_traffic_page_log),消费者(dwd_traffic_unique_visitor_detail)

最后启动日志脚本 lg.sh 查看,跑通数即可。