第一章.DWM层介绍

DWM层(Data WareHouse Middle),一般称为数据中间层,该层会在DWD的基础上,对数据做轻度的”聚合”操作,生成宽表

DWM层主要服务于DWS,因为部分需求直接从DWD层到DWS层中间有一定的计算量,而且这部分的计算结果很有可能被多个DWS层主题复用,所以部分DWD会形成一层DWM

主要涉及业务

  1. 访问UV计算
  2. 跳出明细计算
  3. 订单宽表
  4. 支付宽表

第二章.DWM层:UV计算

1.需求分析

UV,全称是Unique Visitor,即独立访客,对于实时计算中,也可以称为DAU( Daily Active User),即每日活跃用户,并且实时计算中的uv通常是指当日的访客数

  1. 问题:怎么从用户行为日志中识别出当日的访客
  2. 1.识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
  3. 2.由于访客可以在一天中多次进入应用,所以我们要在一天的范围内去重
  4. 3.如何在第二天对某个用户重新做UV计算

实现思路

  1. 使用event-time语义(考虑数据的乱序)
  2. 按照mid分组
  3. 添加窗口
  4. 过滤出来当天的首次访问记录(去重)
    • 使用flink的状态,而且状态只保留一天即可
    • 什么时候清除状态,现在的日期和状态中保存的日期不一致的时候清除
  5. 把当天的首次访问记录写入到dwm层(kafka)

2.具体实现

  1. package com.atguigu.gmall.app.dwm;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.gmall.app.BaseAppV1;
  5. import com.atguigu.gmall.common.Constant;
  6. import com.atguigu.gmall.util.FlinkSinkUtil;
  7. import com.atguigu.gmall.util.IteratorToListUtil;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.state.ValueState;
  11. import org.apache.flink.api.common.state.ValueStateDescriptor;
  12. import org.apache.flink.configuration.Configuration;
  13. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  16. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  17. import org.apache.flink.streaming.api.windowing.time.Time;
  18. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  19. import org.apache.flink.util.Collector;
  20. import java.time.Duration;
  21. import java.util.Collections;
  22. import java.util.Comparator;
  23. import java.util.List;
  24. public class DwmUvApp extends BaseAppV1 {
  25. public static void main(String[] args) {
  26. new DwmUvApp().init(3001,1,"DwmUvApp","DwmUvApp", Constant.TOPIC_DWD_PAGE);
  27. }
  28. @Override
  29. public void run(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
  30. stream.map(
  31. new MapFunction<String, JSONObject>() {
  32. @Override
  33. public JSONObject map(String value) throws Exception {
  34. return JSON.parseObject(value);
  35. }
  36. }
  37. )
  38. .assignTimestampsAndWatermarks(
  39. WatermarkStrategy
  40. .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  41. .withTimestampAssigner((obj,ts)->obj.getLong("ts"))
  42. )
  43. .keyBy(obj -> obj.getJSONObject("common").getString("mid"))
  44. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  45. .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
  46. private ValueState<Long> firstVisitState;
  47. @Override
  48. public void open(Configuration parameters) throws Exception {
  49. firstVisitState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstVisitState", Long.class));
  50. }
  51. @Override
  52. public void process(String key, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception {
  53. //识别天的变化,然后清空状态
  54. //今天
  55. String today = IteratorToListUtil.toDateString(context.window().getStart());
  56. //昨天
  57. String yesterday = firstVisitState.value() == null ? null : IteratorToListUtil.toDateString(firstVisitState.value());
  58. if(!today.equals(yesterday)){
  59. firstVisitState.clear();
  60. }
  61. if(firstVisitState.value()==null){
  62. List<JSONObject> list = IteratorToListUtil.toList(elements);
  63. JSONObject min = Collections.min(list, Comparator.comparing(ele -> ele.getLong("ts")));
  64. out.collect(min);
  65. firstVisitState.update(min.getLong("ts"));
  66. }
  67. }
  68. })
  69. .map(new MapFunction<JSONObject, String>() {
  70. @Override
  71. public String map(JSONObject value) throws Exception {
  72. return value.toJSONString();
  73. }
  74. })
  75. .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_UV));
  76. }
  77. }

3.测试

分别启动hadoop,zookeeper,kafka,gmall-logger,flink-yarn,dwdLogAPP,dwmUvApp

然后在终端起一个消费者消费 dwm_uv

Flink实时数仓(二) - 图1

可以看出,消费到的mid没有重复

第三章.DWM层:跳出明细

1.需求分析

  1. 1.什么是跳出
  2. 跳出就是用户成功的访问了网站的入口页面(例如首页)后就退出,不再继续访问网站的其他页面
  3. 跳出率计算公式: 跳出率=访问一个页面后离开网站的次数 / 入口总访问次数
  4. 观察关键词的跳出率就可以得到用户对网站内容的认可,或者说你的网站是否对用户有吸引力,而网站的内容是否能够对用户有吸引力,而网站的内容是否能够对用户有帮助留住用户也可以直接从跳出率中看出来所以跳出率是衡量网站内容质量的重要标准
  5. 2.计算跳出率的思路
  6. 首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来,那么要抓住几个特征:
  7. 一.该页面是用户近期访问的第一个页面,这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明,这是这个访客访问的第一个页面
  8. 二.首次访问之后很长一段时间,用户没继续再有其他页面的访问,这个可以通过Flink自带的CEP技术来实现,用户跳出事件,本质上就是一个条件事件加一个超时事件的组合

2.具体实现

  1. package com.atguigu.gmall.app.dwm;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.gmall.app.BaseAppV1;
  5. import com.atguigu.gmall.common.Constant;
  6. import com.atguigu.gmall.util.FlinkSinkUtil;
  7. import com.atguigu.gmall.util.IteratorToListUtil;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.state.ValueState;
  11. import org.apache.flink.api.common.state.ValueStateDescriptor;
  12. import org.apache.flink.cep.CEP;
  13. import org.apache.flink.cep.PatternSelectFunction;
  14. import org.apache.flink.cep.PatternStream;
  15. import org.apache.flink.cep.PatternTimeoutFunction;
  16. import org.apache.flink.cep.pattern.Pattern;
  17. import org.apache.flink.cep.pattern.conditions.SimpleCondition;
  18. import org.apache.flink.configuration.Configuration;
  19. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  20. import org.apache.flink.streaming.api.datastream.KeyedStream;
  21. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  22. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  23. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  24. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  25. import org.apache.flink.streaming.api.windowing.time.Time;
  26. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  27. import org.apache.flink.util.Collector;
  28. import org.apache.flink.util.OutputTag;
  29. import java.time.Duration;
  30. import java.util.Collections;
  31. import java.util.Comparator;
  32. import java.util.List;
  33. import java.util.Map;
  34. public class DwmUserJumpDetailApp extends BaseAppV1 {
  35. public static void main(String[] args) {
  36. new DwmUserJumpDetailApp().init(3002,1,"DwmUserJumpDetailApp","DwmUserJumpDetailApp", Constant.TOPIC_DWD_PAGE);
  37. }
  38. @Override
  39. public void run(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
  40. //1.获取流
  41. KeyedStream<JSONObject, String> keyedStream = stream.map(
  42. new MapFunction<String, JSONObject>() {
  43. @Override
  44. public JSONObject map(String value) throws Exception {
  45. return JSON.parseObject(value);
  46. }
  47. }
  48. )
  49. .assignTimestampsAndWatermarks(
  50. WatermarkStrategy
  51. .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  52. .withTimestampAssigner((obj, ts) -> obj.getLong("ts"))
  53. )
  54. .keyBy(obj -> obj.getJSONObject("common").getString("mid"));
  55. //2.定义模式
  56. Pattern<JSONObject, JSONObject> pattern = Pattern
  57. .<JSONObject>begin("entry1")
  58. .where(new SimpleCondition<JSONObject>() {
  59. @Override
  60. public boolean filter(JSONObject value) throws Exception {
  61. //判断入口:lastpageId是null
  62. String lastPageId = value.getJSONObject("page").getString("last_page_id");
  63. return lastPageId == null || lastPageId.length() == 0;
  64. }
  65. })
  66. .next("entry2")
  67. .where(new SimpleCondition<JSONObject>() {
  68. @Override
  69. public boolean filter(JSONObject value) throws Exception {
  70. //不是入口
  71. String lastPageId = value.getJSONObject("page").getString("last_page_id");
  72. return lastPageId == null || lastPageId.length() == 0;
  73. }
  74. })
  75. .within(Time.seconds(5));
  76. //3.把模式作用在流上
  77. PatternStream<JSONObject> ps = CEP.pattern(keyedStream, pattern);
  78. //4.取到匹配到的数据
  79. SingleOutputStreamOperator<JSONObject> normal = ps.select(
  80. new OutputTag<JSONObject>("jump") {
  81. },
  82. new PatternTimeoutFunction<JSONObject, JSONObject>() {
  83. @Override
  84. public JSONObject timeout(Map<String, List<JSONObject>> map, long l) throws Exception {
  85. return map.get("entry1").get(0);
  86. }
  87. },
  88. //这个是正常访问的,不需要,可以不实现
  89. new PatternSelectFunction<JSONObject, JSONObject>() {
  90. @Override
  91. public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
  92. return map.get("entry1").get(0);
  93. }
  94. }
  95. );
  96. normal
  97. .union(normal.getSideOutput(new OutputTag<JSONObject>("jump") {}))
  98. .map(new MapFunction<JSONObject, String>() {
  99. @Override
  100. public String map(JSONObject value) throws Exception {
  101. return value.toJSONString();
  102. }
  103. })
  104. //.print();
  105. .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_UJ_DETAIL));
  106. }
  107. }

3.测试

分别启动hadoop,zookeeper,kafka,gmall-logger,flink-yarn,dwdLogAPP,dwmUvApp,DwmUserJumpDetailApp

然后在终端起一个消费者消费 dwm_uj_detail

Flink实时数仓(二) - 图2

可以看到消费到的数据都是没有last_page_id的数据,即跳出数据

第四章.DWM层:订单宽表

订单是统计分析的重要对象,围绕订单有很多的维度统计需求,比如用户,地区,商品,品类,品牌等

为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算中将围绕订单的相关数据整合成为一张订单的宽表

Flink实时数仓(二) - 图3

  1. 如上图,由于在之前的操作我们已经把数据分拆成了事实数据和维度数据,事实数据(订单表,订单明细)进入kafka数据流(DWD层中),维度数据进入Hbase中长期保存,那么我们在DWM层中需要把实时和维度数据进行整合关联在一起,形成宽表,那么这里就要处理有两种关联:事实数据与事实数据关联,事实数据与维度数据关联
  2. 1.事实数据与事实数据关联,其实就是流与流之间的关联
  3. 2.事实数据与维度数据关联,其实就是流计算中查询外部数据源

1.订单与订单明细关联

一.封装可以同时消费多个topic的BaseApp

  1. package com.atguigu.gmall.app;
  2. import com.atguigu.gmall.util.FlinkSourceUtil;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  5. import org.apache.flink.streaming.api.CheckpointingMode;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import java.util.ArrayList;
  10. import java.util.Arrays;
  11. import java.util.HashMap;
  12. public abstract class BaseAppV2 {
  13. public abstract void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> stream);
  14. public void init(int port,int p,String ck,String groupId,String topic,String... otherTopics){
  15. System.setProperty("HADOOP_USER_NAME","atguigu");
  16. Configuration conf = new Configuration();
  17. conf.setInteger("rest.port",port);//设置web端端口号
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  19. env.setParallelism(p);
  20. //配置checkpoint
  21. env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
  22. env.setStateBackend(new HashMapStateBackend());
  23. env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/gmall/" + ck);
  24. env.getCheckpointConfig().setCheckpointTimeout(10 * 1000);
  25. env.getCheckpointConfig()
  26. .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  27. ArrayList<String> topics = new ArrayList<>();
  28. topics.add(topic);
  29. topics.addAll(Arrays.asList(otherTopics));
  30. HashMap<String, DataStreamSource<String>> topicAndStreamMap = new HashMap<>();
  31. for (String t : topics) {
  32. DataStreamSource<String> stream = env.addSource(FlinkSourceUtil.getKafkaSource(groupId, t));
  33. topicAndStreamMap.put(t,stream);
  34. }
  35. //不同的APP有不同的业务逻辑
  36. run(env,topicAndStreamMap);
  37. try {
  38. env.execute(ck);
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }

二.实时表的join

为了取数据方便,封装几个pojo类

  1. OrderInfo.java
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. import java.text.ParseException;
  7. import java.text.SimpleDateFormat;
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class OrderInfo {
  12. private Long id; // 订单id
  13. private Long province_id;
  14. private String order_status;
  15. private Long user_id;
  16. private BigDecimal total_amount; // 支付的金额
  17. private BigDecimal activity_reduce_amount;
  18. private BigDecimal coupon_reduce_amount;
  19. private BigDecimal original_total_amount; // 没有优化和加运费之前的金额
  20. private BigDecimal feight_fee;
  21. private String expire_time;
  22. private String create_time; // 年月日时分秒
  23. private String operate_time;
  24. private String create_date; // 把其他字段处理得到
  25. private String create_hour;
  26. private Long create_ts;
  27. // 为了create_ts时间戳赋值, 所以需要手动补充
  28. public void setCreate_time(String create_time) throws ParseException {
  29. this.create_time = create_time;
  30. this.create_date = this.create_time.substring(0, 10); // 年月日
  31. this.create_hour = this.create_time.substring(11, 13); // 小时
  32. final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  33. this.create_ts = sdf.parse(create_time).getTime();
  34. }
  35. }
  1. OrderDetail.java
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. import java.text.ParseException;
  7. import java.text.SimpleDateFormat;
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class OrderDetail {
  12. private Long id;
  13. private Long order_id;
  14. private Long sku_id;
  15. private BigDecimal order_price;
  16. private Long sku_num;
  17. private String sku_name;
  18. private String create_time;
  19. private BigDecimal split_total_amount;
  20. private BigDecimal split_activity_amount;
  21. private BigDecimal split_coupon_amount;
  22. private Long create_ts;
  23. // 为了create_ts时间戳赋值, 所以需要手动补充
  24. public void setCreate_time(String create_time) throws ParseException {
  25. this.create_time = create_time;
  26. final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  27. this.create_ts = sdf.parse(create_time).getTime();
  28. }
  29. }
  1. OrderWide.java
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. import java.text.ParseException;
  7. import java.text.SimpleDateFormat;
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class OrderWide {
  12. private Long detail_id;
  13. private Long order_id;
  14. private Long sku_id;
  15. private BigDecimal order_price;
  16. private Long sku_num;
  17. private String sku_name;
  18. private Long province_id;
  19. private String order_status;
  20. private Long user_id;
  21. private BigDecimal total_amount;
  22. private BigDecimal activity_reduce_amount;
  23. private BigDecimal coupon_reduce_amount;
  24. private BigDecimal original_total_amount;
  25. private BigDecimal feight_fee;
  26. private BigDecimal split_feight_fee;
  27. private BigDecimal split_activity_amount;
  28. private BigDecimal split_coupon_amount;
  29. private BigDecimal split_total_amount;
  30. private String expire_time;
  31. private String create_time;
  32. private String operate_time;
  33. private String create_date; // 把其他字段处理得到
  34. private String create_hour;
  35. private String province_name;//查询维表得到
  36. private String province_area_code;
  37. private String province_iso_code;
  38. private String province_3166_2_code;
  39. private Integer user_age;
  40. private String user_gender;
  41. private Long spu_id; //作为维度数据 要关联进来
  42. private Long tm_id;
  43. private Long category3_id;
  44. private String spu_name;
  45. private String tm_name;
  46. private String category3_name;
  47. public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
  48. mergeOrderInfo(orderInfo);
  49. mergeOrderDetail(orderDetail);
  50. }
  51. public void mergeOrderInfo(OrderInfo orderInfo) {
  52. if (orderInfo != null) {
  53. this.order_id = orderInfo.getId();
  54. this.order_status = orderInfo.getOrder_status();
  55. this.create_time = orderInfo.getCreate_time();
  56. this.create_date = orderInfo.getCreate_date();
  57. this.create_hour = orderInfo.getCreate_hour();
  58. this.activity_reduce_amount = orderInfo.getActivity_reduce_amount();
  59. this.coupon_reduce_amount = orderInfo.getCoupon_reduce_amount();
  60. this.original_total_amount = orderInfo.getOriginal_total_amount();
  61. this.feight_fee = orderInfo.getFeight_fee();
  62. this.total_amount = orderInfo.getTotal_amount();
  63. this.province_id = orderInfo.getProvince_id();
  64. this.user_id = orderInfo.getUser_id();
  65. }
  66. }
  67. public void mergeOrderDetail(OrderDetail orderDetail) {
  68. if (orderDetail != null) {
  69. this.detail_id = orderDetail.getId();
  70. this.sku_id = orderDetail.getSku_id();
  71. this.sku_name = orderDetail.getSku_name();
  72. this.order_price = orderDetail.getOrder_price();
  73. this.sku_num = orderDetail.getSku_num();
  74. this.split_activity_amount = orderDetail.getSplit_activity_amount();
  75. this.split_coupon_amount = orderDetail.getSplit_coupon_amount();
  76. this.split_total_amount = orderDetail.getSplit_total_amount();
  77. }
  78. }
  79. public void calcUserAge(String birthday) throws ParseException {
  80. long now = System.currentTimeMillis();
  81. long birth = new SimpleDateFormat("yyyy-MM-dd").parse(birthday).getTime();
  82. this.user_age = (int) ((now - birth) / 1000 / 60 / 60 / 24 / 365);
  83. }
  84. }
  1. 实时表的join
  1. /**
  2. * 事实表的join(orderDetail和orderInfo)
  3. * @param streams
  4. */
  5. private SingleOutputStreamOperator<OrderWide> joinFacts(HashMap<String, DataStreamSource<String>> streams) {
  6. KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
  7. .map(new MapFunction<String, OrderInfo>() {
  8. @Override
  9. public OrderInfo map(String value) throws Exception {
  10. return JSON.parseObject(value, OrderInfo.class);
  11. }
  12. })
  13. .assignTimestampsAndWatermarks(
  14. WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  15. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  16. )
  17. .keyBy(orderInfo -> orderInfo.getId());
  18. KeyedStream<OrderDetail, Long> orderDetailLongKeyedStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
  19. .map(new MapFunction<String, OrderDetail>() {
  20. @Override
  21. public OrderDetail map(String value) throws Exception {
  22. return JSON.parseObject(value, OrderDetail.class);
  23. }
  24. })
  25. .assignTimestampsAndWatermarks(
  26. WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  27. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  28. )
  29. .keyBy(orderDetail -> orderDetail.getOrder_id());
  30. return orderInfoStream
  31. .intervalJoin(orderDetailLongKeyedStream)
  32. .between(Time.seconds(-10),Time.seconds(10))
  33. .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
  34. @Override
  35. public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {
  36. out.collect(new OrderWide(left,right));
  37. }
  38. });
  39. }

2.关联维度表

维度关联实际上就是在流中查询存储在hbase中的数据表

但是即使通过主键的方式查询,hbase速度的查询也是不及流之间的join,外部数据源的查询常常是流式计算的性能瓶颈,所以在这个基础上还能进行一定的优化

一.将维度数据初始化到Hbase

需要开启hbase,Maxwell,DwdDbApp

共用到六张维度表: user_info,base_province,sku_info,spu_info,base_category3,base_trademark

  1. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table user_info --client_id maxwell_1
  2. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table base_province --client_id maxwell_1
  3. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table sku_info --client_id maxwell_1
  4. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table spu_info --client_id maxwell_1
  5. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table base_category3 --client_id maxwell_1
  6. bin/maxwell-bootstrap --user root --password aaaaaa --host hadoop162 --database gmall2021 --table base_trademark --client_id maxwell_1

二.更新JdbcUtil工具类

实现从Jdbc中查询数据

  1. package com.atguigu.gmall.util;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.atguigu.gmall.common.Constant;
  4. import org.apache.commons.beanutils.BeanUtils;
  5. import java.sql.*;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class JdbcUtil {
  9. public static Connection getPhoenixConnection(String phoenixUrl) throws SQLException, ClassNotFoundException {
  10. String phoenixDriver = Constant.PHOENIX_DRIVER;
  11. return getJdbcConnection(phoenixDriver,phoenixUrl);
  12. }
  13. private static Connection getJdbcConnection(String driver, String url) throws ClassNotFoundException, SQLException {
  14. Class.forName(driver);
  15. return DriverManager.getConnection(url);
  16. }
  17. public static void main(String[] args) throws SQLException, ClassNotFoundException {
  18. List<JSONObject> list = queryList(getPhoenixConnection(Constant.PHOENIX_URL),
  19. "select * from dim_user_info where id=?",
  20. new String[]{"1"},
  21. JSONObject.class
  22. );
  23. for (JSONObject jsonObject : list) {
  24. System.out.println(jsonObject);
  25. }
  26. }
  27. public static <T> List<T> queryList(Connection conn,
  28. String sql,
  29. Object[] args,
  30. Class<T> tClass ){
  31. ArrayList<T> result = new ArrayList<>();
  32. //查询数据库
  33. try {
  34. PreparedStatement ps = conn.prepareStatement(sql);
  35. //给SQL中的占位符进行赋值
  36. for (int i = 0; args != null && i < args.length; i++) {
  37. ps.setObject(i+1,args[i]);
  38. }
  39. ResultSet resultSet = ps.executeQuery();
  40. ResultSetMetaData metaData = resultSet.getMetaData();
  41. while(resultSet.next()){
  42. //创建一个T类型的对象,把每列的数据都封装在T对象中
  43. T t = tClass.newInstance();
  44. for (int i = 1; i < metaData.getColumnCount(); i++) {
  45. //遍历每列数据
  46. String columnName = metaData.getColumnName(i);
  47. Object value = resultSet.getObject(columnName);
  48. BeanUtils.setProperty(t,columnName,value);
  49. }
  50. result.add(t);
  51. }
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. return result;
  56. }
  57. }

三.封装DimUtil类,方便从Phoenix中获取数据

  1. package com.atguigu.gmall.util;
  2. import com.alibaba.fastjson.JSONObject;
  3. import java.sql.Connection;
  4. import java.util.List;
  5. public class DimUtil {
  6. public static JSONObject getDimFromPhoenix(Connection phoenixConn,
  7. String table,
  8. Long id){
  9. String sql = "select * from " + table + " where id=?";
  10. Object[] args = {id.toString()};
  11. List<JSONObject> result = JdbcUtil.queryList(phoenixConn, sql, args, JSONObject.class);
  12. return result.size()==1 ? result.get(0): null;
  13. }
  14. }

四.Join维度表代码

  1. package com.atguigu.gmall.app.dwm;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.gmall.app.BaseAppV2;
  5. import com.atguigu.gmall.common.Constant;
  6. import com.atguigu.gmall.pojo.OrderDetail;
  7. import com.atguigu.gmall.pojo.OrderInfo;
  8. import com.atguigu.gmall.pojo.OrderWide;
  9. import com.atguigu.gmall.util.DimUtil;
  10. import com.atguigu.gmall.util.JdbcUtil;
  11. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  12. import org.apache.flink.api.common.functions.MapFunction;
  13. import org.apache.flink.api.common.functions.RichMapFunction;
  14. import org.apache.flink.configuration.Configuration;
  15. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  16. import org.apache.flink.streaming.api.datastream.KeyedStream;
  17. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  18. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  19. import java.sql.Connection;
  20. import java.time.Duration;
  21. import java.util.HashMap;
  22. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  23. import org.apache.flink.streaming.api.windowing.time.Time;
  24. import org.apache.flink.util.Collector;
  25. public class DwmOrderWriteApp extends BaseAppV2 {
  26. public static void main(String[] args) {
  27. new DwmOrderWriteApp().init(3003,1,"DwmOrderWriteApp","DwmOrderWriteApp", Constant.TOPIC_DWD_ORDER_INFO,Constant.TOPIC_DWD_ORDER_DETAIL);
  28. }
  29. @Override
  30. public void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> streams) {
  31. //streams.get(Constant.TOPIC_DWD_ORDER_INFO).print(Constant.TOPIC_DWD_ORDER_INFO);
  32. //streams.get(Constant.TOPIC_DWD_ORDER_DETAIL).print(Constant.TOPIC_DWD_ORDER_DETAIL);
  33. //1.事实表的join: interval join
  34. SingleOutputStreamOperator<OrderWide> orderWideStreamWithoutDims = joinFacts(streams);
  35. //2.join维度数据
  36. joinDims(orderWideStreamWithoutDims);
  37. //3.把结果写入到kafka中,给dws准备数据
  38. }
  39. private void joinDims(SingleOutputStreamOperator<OrderWide> orderWideStreamWithoutDims) {
  40. //join 6 张维度表
  41. //根据维度表中的某个id去查找对应的那一条数据
  42. orderWideStreamWithoutDims
  43. .map(new RichMapFunction<OrderWide, OrderWide>() {
  44. private Connection phoenixConnection;
  45. @Override
  46. public void open(Configuration parameters) throws Exception {
  47. phoenixConnection = JdbcUtil.getPhoenixConnection(Constant.PHOENIX_URL);
  48. }
  49. @Override
  50. public OrderWide map(OrderWide orderWide) throws Exception {
  51. //1.补充user
  52. JSONObject userInfo = DimUtil.getDimFromPhoenix(
  53. phoenixConnection,
  54. "dim_user_info",
  55. orderWide.getUser_id());
  56. orderWide.setUser_gender(userInfo.getString("GENDER"));
  57. orderWide.calcUserAge(userInfo.getString("BIRTHDAY"));
  58. // 2. 补齐省份
  59. JSONObject provinceInfo = DimUtil.getDimFromPhoenix(phoenixConnection,
  60. "dim_base_province",
  61. orderWide.getProvince_id());
  62. orderWide.setProvince_name(provinceInfo.getString("NAME"));
  63. orderWide.setProvince_iso_code(provinceInfo.getString("ISO_CODE"));
  64. orderWide.setProvince_area_code(provinceInfo.getString("AREA_CODE"));
  65. orderWide.setProvince_3166_2_code(provinceInfo.getString("ISO_3166_2"));
  66. // 3. sku
  67. JSONObject skuInfo = DimUtil.getDimFromPhoenix(phoenixConnection,
  68. "dim_sku_info",
  69. orderWide.getSku_id());
  70. orderWide.setSku_name(skuInfo.getString("SKU_NAME"));
  71. orderWide.setSpu_id(skuInfo.getLong("SPU_ID"));
  72. orderWide.setTm_id(skuInfo.getLong("TM_ID"));
  73. orderWide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));
  74. // 4. spu
  75. JSONObject spuInfo = DimUtil.getDimFromPhoenix(phoenixConnection,
  76. "dim_spu_info",
  77. orderWide.getSpu_id());
  78. orderWide.setSpu_name(spuInfo.getString("SPU_NAME"));
  79. // 5. tm
  80. JSONObject tmInfo = DimUtil.getDimFromPhoenix(phoenixConnection,
  81. "dim_base_trademark",
  82. orderWide.getTm_id());
  83. orderWide.setTm_name(tmInfo.getString("TM_NAME"));
  84. // 5. tm
  85. JSONObject c3Info = DimUtil.getDimFromPhoenix(phoenixConnection,
  86. "dim_base_category3",
  87. orderWide.getCategory3_id());
  88. orderWide.setCategory3_name(c3Info.getString("NAME"));
  89. return orderWide;
  90. }
  91. })
  92. .print();
  93. }
  94. /**
  95. * 事实表的join(orderDetail和orderInfo)
  96. * @param streams
  97. */
  98. private SingleOutputStreamOperator<OrderWide> joinFacts(HashMap<String, DataStreamSource<String>> streams) {
  99. KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
  100. .map(new MapFunction<String, OrderInfo>() {
  101. @Override
  102. public OrderInfo map(String value) throws Exception {
  103. return JSON.parseObject(value, OrderInfo.class);
  104. }
  105. })
  106. .assignTimestampsAndWatermarks(
  107. WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  108. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  109. )
  110. .keyBy(orderInfo -> orderInfo.getId());
  111. KeyedStream<OrderDetail, Long> orderDetailLongKeyedStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
  112. .map(new MapFunction<String, OrderDetail>() {
  113. @Override
  114. public OrderDetail map(String value) throws Exception {
  115. return JSON.parseObject(value, OrderDetail.class);
  116. }
  117. })
  118. .assignTimestampsAndWatermarks(
  119. WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  120. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  121. )
  122. .keyBy(orderDetail -> orderDetail.getOrder_id());
  123. return orderInfoStream
  124. .intervalJoin(orderDetailLongKeyedStream)
  125. .between(Time.seconds(-10),Time.seconds(10))
  126. .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
  127. @Override
  128. public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {
  129. out.collect(new OrderWide(left,right));
  130. }
  131. });
  132. }
  133. }

五.启动APP进行测试,发现维度信息已经添加成功

Flink实时数仓(二) - 图4

3.维度管理优化一:加入缓存

上一节实现的功能是直接查询的Hbase,外部数据源的查询往往是流式计算的性能瓶颈,所以我们需要在之前实现的基础上进行一定的优化,使用redis缓存

一.为什么使用redis缓存

  1. 要实现的目标:第一次从数据库读,这条以后应该从缓存(内存)
  2. 1.把维度数据缓存到flink的状态中
  3. 优点:本地内存,快,不需要网络,数据结构也比较丰富
  4. 缺点:
  5. 一.对flink的内存有压力
  6. 二.维度有变化,缓存的数据没有办法收到这个变化,因为缓存是在dwm层的应用,维度数据是dwdDbApp负责写入到hbase的,如果有变化只有这个APP知道
  7. 2.把维度数据缓存到外部专用的缓存:redis
  8. 优点:专用缓存,容器比较大,速度也快,如果维度发生变化,dwdDbApp可以直接访问Redis去更新缓存
  9. 缺点:需要用过网络
  10. 3.综合考虑,就应该使用redis缓存

二.redis中数据结构的选择

数据结构 key value 缺点 优点
String 表名加Id(dim_user_info:1) json格式字串 key比较多,管理不方便 1.读写方便2.可以单独给每个key设置过期时间
list 表名(dim_user_info) 每个数据的json格式 读很难 一张表一个key,写方便
set 表名(dim_user_info) 每个数据的json格式 读很难 一张表一个key,写方便
hash 表名 field+value 没有办法单独给每条数据设置过期时间 一个表一个字段,读写也方便

综合考虑使用String结构比较合适

三.封装Redis客户端工具类

  1. 导入Jedis依赖
  1. <dependency>
  2. <groupId>redis.clients</groupId>
  3. <artifactId>jedis</artifactId>
  4. <version>3.2.0</version>
  5. </dependency>
  1. 封装Redis工具类
  1. package com.atguigu.gmall.util;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. public class RedisUtil {
  6. private static JedisPool pool;
  7. static {
  8. JedisPoolConfig conf = new JedisPoolConfig();
  9. conf.setMaxTotal(300);
  10. conf.setMaxIdle(10);
  11. conf.setMaxWaitMillis(10 * 1000);
  12. conf.setMinIdle(5);
  13. conf.setTestOnBorrow(true);
  14. conf.setTestOnCreate(true);
  15. conf.setTestOnReturn(true);
  16. pool = new JedisPool(conf,"hadoop162",6379);
  17. }
  18. public static Jedis getRedisClient(){
  19. return pool.getResource();
  20. }
  21. }

四.使用缓存读取维度数据

  1. DimUtil.java
  1. package com.atguigu.gmall.util;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import redis.clients.jedis.Jedis;
  5. import java.sql.Connection;
  6. import java.util.List;
  7. public class DimUtil {
  8. public static JSONObject getDimFromPhoenix(Connection phoenixConn,
  9. String table,
  10. Long id){
  11. String sql = "select * from " + table + " where id=?";
  12. Object[] args = {id.toString()};
  13. List<JSONObject> result = JdbcUtil.queryList(phoenixConn, sql, args, JSONObject.class);
  14. return result.size()==1 ? result.get(0): null;
  15. }
  16. public static JSONObject getDim(Connection phoenixConnection,
  17. Jedis redisClient,
  18. String table,
  19. Long id) {
  20. //1.先从redis中获取
  21. JSONObject dim = getDimFromRedis(redisClient, table, id);
  22. //3.获取不到从Phoenix中获取,结果返回,把结果写入到redis
  23. if(dim == null){
  24. System.out.println(table + " " + id + "走的数据库");
  25. dim = getDimFromPhoenix(phoenixConnection,table,id);
  26. //把维度数据写入到redis中
  27. writeToRedis(redisClient,table,id,dim);
  28. }else{
  29. System.out.println(table + " " + id + "走的缓存");
  30. }
  31. //2.获取到了信息,直接返回
  32. return dim;
  33. }
  34. private static void writeToRedis(Jedis redisClient,
  35. String table,
  36. Long id,
  37. JSONObject dim) {
  38. String key = table + ":" + id;
  39. String value = dim.toJSONString();
  40. redisClient.setex(key,3*24*60*60,value);
  41. }
  42. private static JSONObject getDimFromRedis(Jedis redisClient, String table, Long id) {
  43. String key = table + ":" + id;
  44. String dim = redisClient.get(key); //如果缓存不存在,就是null
  45. return dim == null ? null : JSON.parseObject(dim);
  46. }
  47. }
  1. DwmOrderWriteApp_Cache .java
  1. package com.atguigu.gmall.app.dwm;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.gmall.app.BaseAppV2;
  5. import com.atguigu.gmall.common.Constant;
  6. import com.atguigu.gmall.pojo.OrderDetail;
  7. import com.atguigu.gmall.pojo.OrderInfo;
  8. import com.atguigu.gmall.pojo.OrderWide;
  9. import com.atguigu.gmall.util.DimUtil;
  10. import com.atguigu.gmall.util.JdbcUtil;
  11. import com.atguigu.gmall.util.RedisUtil;
  12. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  13. import org.apache.flink.api.common.functions.MapFunction;
  14. import org.apache.flink.api.common.functions.RichMapFunction;
  15. import org.apache.flink.configuration.Configuration;
  16. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  17. import org.apache.flink.streaming.api.datastream.KeyedStream;
  18. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  19. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  20. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  21. import org.apache.flink.streaming.api.windowing.time.Time;
  22. import org.apache.flink.util.Collector;
  23. import redis.clients.jedis.Jedis;
  24. import java.sql.Connection;
  25. import java.time.Duration;
  26. import java.util.HashMap;
  27. public class DwmOrderWriteApp_Cache extends BaseAppV2 {
  28. public static void main(String[] args) {
  29. new DwmOrderWriteApp_Cache().init(3004,1,"DwmOrderWriteApp_Cache","DwmOrderWriteApp_Cache", Constant.TOPIC_DWD_ORDER_INFO,Constant.TOPIC_DWD_ORDER_DETAIL);
  30. }
  31. @Override
  32. public void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> streams) {
  33. //streams.get(Constant.TOPIC_DWD_ORDER_INFO).print(Constant.TOPIC_DWD_ORDER_INFO);
  34. //streams.get(Constant.TOPIC_DWD_ORDER_DETAIL).print(Constant.TOPIC_DWD_ORDER_DETAIL);
  35. //1.事实表的join: interval join
  36. SingleOutputStreamOperator<OrderWide> orderWideStreamWithoutDims = joinFacts(streams);
  37. //2.join维度数据
  38. joinDims(orderWideStreamWithoutDims);
  39. //3.把结果写入到kafka中,给dws准备数据
  40. }
  41. private void joinDims(SingleOutputStreamOperator<OrderWide> orderWideStreamWithoutDims) {
  42. //join 6 张维度表
  43. //根据维度表中的某个id去查找对应的那一条数据
  44. orderWideStreamWithoutDims
  45. .map(new RichMapFunction<OrderWide, OrderWide>() {
  46. @Override
  47. public void close() throws Exception {
  48. if(phoenixConnection != null){
  49. phoenixConnection.close();
  50. }
  51. if(redisClient != null){
  52. //如果客户端是new Jedis()这样出来,则是关闭客户端,
  53. // 如果是通过连接池获取的,则归还给连接池
  54. redisClient.close();
  55. }
  56. }
  57. private Jedis redisClient;
  58. private Connection phoenixConnection;
  59. @Override
  60. public void open(Configuration parameters) throws Exception {
  61. phoenixConnection = JdbcUtil.getPhoenixConnection(Constant.PHOENIX_URL);
  62. //获取redis客户端
  63. redisClient = RedisUtil.getRedisClient();
  64. }
  65. @Override
  66. public OrderWide map(OrderWide orderWide) throws Exception {
  67. //1.补充user
  68. JSONObject userInfo = DimUtil.getDim(
  69. phoenixConnection,
  70. redisClient,
  71. "dim_user_info",
  72. orderWide.getUser_id());
  73. orderWide.setUser_gender(userInfo.getString("GENDER"));
  74. orderWide.calcUserAge(userInfo.getString("BIRTHDAY"));
  75. // 2. 补齐省份
  76. JSONObject provinceInfo = DimUtil.getDim(
  77. phoenixConnection,
  78. redisClient,
  79. "dim_base_province",
  80. orderWide.getProvince_id());
  81. orderWide.setProvince_name(provinceInfo.getString("NAME"));
  82. orderWide.setProvince_iso_code(provinceInfo.getString("ISO_CODE"));
  83. orderWide.setProvince_area_code(provinceInfo.getString("AREA_CODE"));
  84. orderWide.setProvince_3166_2_code(provinceInfo.getString("ISO_3166_2"));
  85. // 3. sku
  86. JSONObject skuInfo = DimUtil.getDim(
  87. phoenixConnection,
  88. redisClient,
  89. "dim_sku_info",
  90. orderWide.getSku_id());
  91. orderWide.setSku_name(skuInfo.getString("SKU_NAME"));
  92. orderWide.setSpu_id(skuInfo.getLong("SPU_ID"));
  93. orderWide.setTm_id(skuInfo.getLong("TM_ID"));
  94. orderWide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));
  95. // 4. spu
  96. JSONObject spuInfo = DimUtil.getDim(
  97. phoenixConnection,
  98. redisClient,
  99. "dim_spu_info",
  100. orderWide.getSpu_id());
  101. orderWide.setSpu_name(spuInfo.getString("SPU_NAME"));
  102. // 5. tm
  103. JSONObject tmInfo =DimUtil.getDim(
  104. phoenixConnection,
  105. redisClient,
  106. "dim_base_trademark",
  107. orderWide.getTm_id());
  108. orderWide.setTm_name(tmInfo.getString("TM_NAME"));
  109. // 5. tm
  110. JSONObject c3Info =DimUtil.getDim(
  111. phoenixConnection,
  112. redisClient,
  113. "dim_base_category3",
  114. orderWide.getCategory3_id());
  115. orderWide.setCategory3_name(c3Info.getString("NAME"));
  116. return orderWide;
  117. }
  118. })
  119. .print();
  120. }
  121. /**
  122. * 事实表的join(orderDetail和orderInfo)
  123. * @param streams
  124. */
  125. private SingleOutputStreamOperator<OrderWide> joinFacts(HashMap<String, DataStreamSource<String>> streams) {
  126. KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
  127. .map(new MapFunction<String, OrderInfo>() {
  128. @Override
  129. public OrderInfo map(String value) throws Exception {
  130. return JSON.parseObject(value, OrderInfo.class);
  131. }
  132. })
  133. .assignTimestampsAndWatermarks(
  134. WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  135. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  136. )
  137. .keyBy(orderInfo -> orderInfo.getId());
  138. KeyedStream<OrderDetail, Long> orderDetailLongKeyedStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
  139. .map(new MapFunction<String, OrderDetail>() {
  140. @Override
  141. public OrderDetail map(String value) throws Exception {
  142. return JSON.parseObject(value, OrderDetail.class);
  143. }
  144. })
  145. .assignTimestampsAndWatermarks(
  146. WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  147. .withTimestampAssigner((info, ts) -> info.getCreate_ts())
  148. )
  149. .keyBy(orderDetail -> orderDetail.getOrder_id());
  150. return orderInfoStream
  151. .intervalJoin(orderDetailLongKeyedStream)
  152. .between(Time.seconds(-10),Time.seconds(10))
  153. .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
  154. @Override
  155. public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {
  156. out.collect(new OrderWide(left,right));
  157. }
  158. });
  159. }
  160. }
  1. 启动APP测试

Flink实时数仓(二) - 图5

Flink实时数仓(二) - 图6

可以看到,已经可以从redis中获取数据了

五.在DwdDbApp中更新缓存

  1. PhoenixSink.java
  1. package com.atguigu.gmall.sink;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.atguigu.gmall.common.Constant;
  4. import com.atguigu.gmall.pojo.TableProcess;
  5. import com.atguigu.gmall.util.JdbcUtil;
  6. import com.atguigu.gmall.util.RedisUtil;
  7. import org.apache.flink.api.common.state.ValueState;
  8. import org.apache.flink.api.common.state.ValueStateDescriptor;
  9. import org.apache.flink.api.java.tuple.Tuple2;
  10. import org.apache.flink.configuration.Configuration;
  11. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  12. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  13. import redis.clients.jedis.Jedis;
  14. import java.io.IOException;
  15. import java.sql.Connection;
  16. import java.sql.PreparedStatement;
  17. import java.sql.SQLException;
  18. import java.util.Map;
  19. public class PhoenixSink extends RichSinkFunction<Tuple2<JSONObject, TableProcess>> {
  20. private Connection conn;
  21. private ValueState<Boolean> isFirst;
  22. private Jedis redisClient;
  23. /**
  24. * 建立到Phoenix的连接
  25. * @param parameters
  26. * @throws Exception
  27. */
  28. @Override
  29. public void open(Configuration parameters) throws Exception {
  30. String url = Constant.PHOENIX_URL;
  31. conn = JdbcUtil.getPhoenixConnection(url);
  32. isFirst = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isFirst",Boolean.class));
  33. redisClient = RedisUtil.getRedisClient();
  34. }
  35. /**
  36. * 关闭连接
  37. * @throws Exception
  38. */
  39. @Override
  40. public void close() throws Exception {
  41. if(conn != null){
  42. conn.close();
  43. }
  44. }
  45. @Override
  46. public void invoke(Tuple2<JSONObject, TableProcess> value, Context context) throws Exception {
  47. //1.建表
  48. checkTable(value);
  49. //2.写数据
  50. writeData(value);
  51. //3.更新缓存
  52. updateCache(value);
  53. }
  54. private void updateCache(Tuple2<JSONObject, TableProcess> value) {
  55. JSONObject data = value.f0;
  56. TableProcess tp = value.f1;
  57. String key = tp.getSink_table() + ":" + data.getLong("id");
  58. //这次操作是更新维度还是去redis中存这个缓存
  59. if("update".equals(tp.getOperate_type()) && redisClient.exists(key)){
  60. //应该把字段名全部大写
  61. JSONObject upperDim = new JSONObject();
  62. for (Map.Entry<String, Object> entry : data.entrySet()) {
  63. upperDim.put(entry.getKey().toUpperCase(),entry.getValue());
  64. }
  65. redisClient.set(key, upperDim.toString());
  66. }
  67. }
  68. private void writeData(Tuple2<JSONObject, TableProcess> value) throws SQLException {
  69. JSONObject data = value.f0;
  70. TableProcess tp = value.f1;
  71. //拼接插入语句
  72. //upsert into t(a,b,c) values(?,?,?)
  73. StringBuilder sql = new StringBuilder();
  74. sql.append("upsert into ")
  75. .append(tp.getSink_table())
  76. .append("(")
  77. .append(tp.getSink_columns())
  78. .append(")values(")
  79. .append(tp.getSink_columns().replaceAll("[^,]+", "?"))
  80. .append(")");
  81. PreparedStatement ps = conn.prepareStatement(sql.toString());
  82. //给占位符赋值,根据列名去data中取值
  83. String[] cs = tp.getSink_columns().split(",");
  84. for (int i = 0; i < cs.length; i++) {
  85. String columnName = cs[i];
  86. Object v = data.get(columnName);
  87. ps.setString(i+1,v==null?null:v.toString());
  88. }
  89. //执行SQL
  90. ps.execute();
  91. conn.commit();
  92. ps.close();
  93. }
  94. /**
  95. * 在Phoenix中进行建表
  96. * 执行建表语句
  97. * @param value
  98. */
  99. private void checkTable(Tuple2<JSONObject, TableProcess> value) throws IOException, SQLException {
  100. if(isFirst.value() == null){
  101. TableProcess tp = value.f1;
  102. StringBuilder sql = new StringBuilder();
  103. sql
  104. .append("create table if not exists ")
  105. .append(tp.getSink_table())
  106. .append("(")
  107. .append(tp.getSink_columns().replaceAll(",", " varchar, "))
  108. .append(" varchar, constraint pk primary key(")
  109. .append(tp.getSink_pk() == null? "id" : tp.getSink_pk())
  110. .append("))")
  111. .append(tp.getSink_extend() == null ? "" : tp.getSink_extend());
  112. System.out.println(sql.toString());
  113. PreparedStatement ps = conn.prepareStatement(sql.toString());
  114. ps.execute();
  115. conn.commit();
  116. ps.close();
  117. isFirst.update(true);
  118. }
  119. }
  120. }
  1. 测试(重新将程序打包上传到linux上并启动)

更改业务数据之前redis里的userinfo

Flink实时数仓(二) - 图7

将数据库中对应的姓名修改为黄蓉

Flink实时数仓(二) - 图8

再从redis中查看,发现姓名已经修改成功

Flink实时数仓(二) - 图9

4.维度管理优化二:使用异步查询

一.异步IO的介绍

  1. Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
  2. 例如:在电商场景中,需要一个商品的skuid去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。
  3. 默认情况下,在FlinkMapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。
  4. Flink 1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
  5. Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
  6. 异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。
  7. 这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。

Flink实时数仓(二) - 图10

使用异步API的先决条件

  • 正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。
  • 如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。
  • Phoenix目前没有提供异步的客户端, 所以只能通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端

Flink的异步I/O API

Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。

在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:

  1. 实现分发请求的 AsyncFunction
  2. 获取数据库交互的结果并发送给ResultFuture的回调函数
  3. 将异步API操作应用于DataStream作为DataStream的一次转换操作

二.封装线程池工具类

  1. package com.atguigu.gmall.util;
  2. import java.util.concurrent.LinkedBlockingDeque;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * 用于异步的线程池工具类
  7. */
  8. public class ThreadPoolUtil {
  9. public static ThreadPoolExecutor getThreadPool(){
  10. return new ThreadPoolExecutor(
  11. 300,//线程池内核心线程数
  12. 500,//线程池内最大线程数
  13. 30,//空闲线程存活时间
  14. TimeUnit.SECONDS,
  15. new LinkedBlockingDeque<>(50)
  16. );
  17. }
  18. }

三.异步代码的具体实现

  1. DimAsyncFunction.java(用于处理异步逻辑)
  1. package com.atguigu.gmall.function;
  2. import com.atguigu.gmall.common.Constant;
  3. import com.atguigu.gmall.util.JdbcUtil;
  4. import com.atguigu.gmall.util.RedisUtil;
  5. import com.atguigu.gmall.util.ThreadPoolUtil;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  8. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  9. import redis.clients.jedis.Jedis;
  10. import java.sql.Connection;
  11. import java.sql.SQLException;
  12. import java.util.concurrent.ThreadPoolExecutor;
  13. public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T> {
  14. private ThreadPoolExecutor threadPool;
  15. @Override
  16. public void open(Configuration parameters) throws Exception {
  17. threadPool = ThreadPoolUtil.getThreadPool();
  18. }
  19. @Override
  20. public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
  21. //在线程中去使用客户端去读取维度数据
  22. threadPool.submit(new Runnable() {
  23. @Override
  24. public void run() {
  25. Connection phoenixConnection = null;
  26. Jedis redisClient = null;
  27. try {
  28. phoenixConnection = JdbcUtil.getPhoenixConnection(Constant.PHOENIX_URL);
  29. redisClient = RedisUtil.getRedisClient();
  30. addDim(phoenixConnection,redisClient,input,resultFuture);
  31. } catch (SQLException throwables) {
  32. throwables.printStackTrace();
  33. } catch (ClassNotFoundException e) {
  34. e.printStackTrace();
  35. } finally {
  36. if(phoenixConnection != null){
  37. try {
  38. phoenixConnection.close();
  39. } catch (SQLException throwables) {
  40. throwables.printStackTrace();
  41. }
  42. }
  43. if(redisClient != null){
  44. redisClient.close();
  45. }
  46. }
  47. }
  48. });
  49. }
  50. protected abstract void addDim(Connection phoenixConnection,
  51. Jedis redisClient,
  52. T input,
  53. ResultFuture<T> resultFuture);
  54. }
  1. 具体实现代码
  1. private SingleOutputStreamOperator<OrderWide> joinDims(SingleOutputStreamOperator<OrderWide> orderWideStreamWithoutDims) {
  2. //join 6 张维度表
  3. //根据维度表中的某个id去查找对应的那一条数据
  4. return AsyncDataStream
  5. .unorderedWait(
  6. orderWideStreamWithoutDims,
  7. new DimAsyncFunction<OrderWide>(){
  8. @Override
  9. protected void addDim(Connection phoenixConnection,
  10. Jedis redisClient,
  11. OrderWide orderWide,
  12. ResultFuture<OrderWide> resultFuture) {
  13. //1.补充user
  14. JSONObject userInfo = DimUtil.getDim(
  15. phoenixConnection,
  16. redisClient,
  17. "dim_user_info",
  18. orderWide.getUser_id());
  19. orderWide.setUser_gender(userInfo.getString("GENDER"));
  20. try {
  21. orderWide.calcUserAge(userInfo.getString("BIRTHDAY"));
  22. } catch (ParseException e) {
  23. e.printStackTrace();
  24. }
  25. // 2. 补齐省份
  26. JSONObject provinceInfo = DimUtil.getDim(
  27. phoenixConnection,
  28. redisClient,
  29. "dim_base_province",
  30. orderWide.getProvince_id());
  31. orderWide.setProvince_name(provinceInfo.getString("NAME"));
  32. orderWide.setProvince_iso_code(provinceInfo.getString("ISO_CODE"));
  33. orderWide.setProvince_area_code(provinceInfo.getString("AREA_CODE"));
  34. orderWide.setProvince_3166_2_code(provinceInfo.getString("ISO_3166_2"));
  35. // 3. sku
  36. JSONObject skuInfo = DimUtil.getDim(
  37. phoenixConnection,
  38. redisClient,
  39. "dim_sku_info",
  40. orderWide.getSku_id());
  41. orderWide.setSku_name(skuInfo.getString("SKU_NAME"));
  42. orderWide.setSpu_id(skuInfo.getLong("SPU_ID"));
  43. orderWide.setTm_id(skuInfo.getLong("TM_ID"));
  44. orderWide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));
  45. // 4. spu
  46. JSONObject spuInfo = DimUtil.getDim(
  47. phoenixConnection,
  48. redisClient,
  49. "dim_spu_info",
  50. orderWide.getSpu_id());
  51. orderWide.setSpu_name(spuInfo.getString("SPU_NAME"));
  52. // 5. tm
  53. JSONObject tmInfo =DimUtil.getDim(
  54. phoenixConnection,
  55. redisClient,
  56. "dim_base_trademark",
  57. orderWide.getTm_id());
  58. orderWide.setTm_name(tmInfo.getString("TM_NAME"));
  59. // 5. tm
  60. JSONObject c3Info =DimUtil.getDim(
  61. phoenixConnection,
  62. redisClient,
  63. "dim_base_category3",
  64. orderWide.getCategory3_id());
  65. orderWide.setCategory3_name(c3Info.getString("NAME"));
  66. resultFuture.complete(Collections.singletonList(orderWide));
  67. }
  68. },
  69. 60,
  70. TimeUnit.SECONDS
  71. );
  72. }
  1. 启动APP测试,发现数据已经在异步处理了

Flink实时数仓(二) - 图11

四.数据写入到kafka

  1. private void write2Kafka(SingleOutputStreamOperator<OrderWide> stream) {
  2. stream.map(
  3. new MapFunction<OrderWide, String>() {
  4. @Override
  5. public String map(OrderWide value) throws Exception {
  6. return JSON.toJSONString(value);
  7. }
  8. }
  9. )
  10. .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_ORDER_WIDE));
  11. }

五.将项目打包到集群测试

Flink实时数仓(二) - 图12

发现异步超时,原因是phoenix打包时依赖问题

解决方法是打包时不打包phoenix依赖,在集群的flink-yarn的lib中添加phoenix依赖

  1. /opt/module/phoenix-5.0.0 » cp phoenix-5.0.0-HBase-2.0-client.jar /opt/module/flink-yarn/lib

重启flink-yarn-session

  1. /opt/module/flink-yarn » bin/yarn-session.sh -d

重新启动应用

Flink实时数仓(二) - 图13

发现正常,没有报异步超时,kafka中也有消费到的数据

Flink实时数仓(二) - 图14

第五章.DWM层:支付宽表

1.需求分析

  1. 支付宽表的目的,最主要的原因是支付表没有订单明细,支付金额没有细分到商品上,没有办法统计商品级别的支付情况
  2. 所以本次宽表的核心是把支付表的信息与订单明细关联上
  3. 解决方案有三个
  1. 一个是把订单明细表输出到hbase上,在支付宽表计算时查询hbase,这相当于把订单明细作为一种维度进行管理
  2. 一个是用流的方式接收订单明细,然后用双流join的方式进行合并,因为订单与支付产生有一定的时差,所以必须用intervalJoin来管理流的状态时间,保证当支付到达时订单清洗还保存在状态中
  3. 使用流的方式让支付表与订单宽表进行join,就省去了查询维度表的步骤

2.具体实现

用到的pojo类

  1. PaymentInfo.java
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.math.BigDecimal;
  6. @Data
  7. @AllArgsConstructor
  8. @NoArgsConstructor
  9. public class PaymentInfo {
  10. private Long id;
  11. private Long order_id;
  12. private Long user_id;
  13. private BigDecimal total_amount;
  14. private String subject;
  15. private String payment_type;
  16. private String create_time;
  17. private String callback_time;
  18. }
  1. PaymentWide.java
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.commons.beanutils.BeanUtils;
  6. import java.lang.reflect.InvocationTargetException;
  7. import java.math.BigDecimal;
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class PaymentWide {
  12. private Long payment_id;
  13. private String subject;
  14. private String payment_type;
  15. private String payment_create_time;
  16. private String callback_time;
  17. private Long detail_id;
  18. private Long order_id;
  19. private Long sku_id;
  20. private BigDecimal order_price;
  21. private Long sku_num;
  22. private String sku_name;
  23. private Long province_id;
  24. private String order_status;
  25. private Long user_id;
  26. private BigDecimal total_amount;
  27. private BigDecimal activity_reduce_amount;
  28. private BigDecimal coupon_reduce_amount;
  29. private BigDecimal original_total_amount;
  30. private BigDecimal feight_fee;
  31. private BigDecimal split_feight_fee;
  32. private BigDecimal split_activity_amount;
  33. private BigDecimal split_coupon_amount;
  34. private BigDecimal split_total_amount;
  35. private String order_create_time;
  36. private String province_name;//查询维表得到
  37. private String province_area_code;
  38. private String province_iso_code;
  39. private String province_3166_2_code;
  40. private Integer user_age;
  41. private String user_gender;
  42. private Long spu_id; //作为维度数据 要关联进来
  43. private Long tm_id;
  44. private Long category3_id;
  45. private String spu_name;
  46. private String tm_name;
  47. private String category3_name;
  48. public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide) {
  49. mergeOrderWide(orderWide);
  50. mergePaymentInfo(paymentInfo);
  51. }
  52. public void mergePaymentInfo(PaymentInfo paymentInfo) {
  53. if (paymentInfo != null) {
  54. try {
  55. BeanUtils.copyProperties(this, paymentInfo);
  56. payment_create_time = paymentInfo.getCreate_time();
  57. payment_id = paymentInfo.getId();
  58. } catch (IllegalAccessException | InvocationTargetException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. public void mergeOrderWide(OrderWide orderWide) {
  64. if (orderWide != null) {
  65. try {
  66. BeanUtils.copyProperties(this, orderWide);
  67. order_create_time = orderWide.getCreate_time();
  68. } catch (IllegalAccessException | InvocationTargetException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. }
  73. }

实现代码

  1. package com.atguigu.gmall.app.dwm;
  2. import com.alibaba.fastjson.JSON;
  3. import com.atguigu.gmall.app.BaseAppV2;
  4. import com.atguigu.gmall.common.Constant;
  5. import com.atguigu.gmall.pojo.OrderWide;
  6. import com.atguigu.gmall.pojo.PaymentInfo;
  7. import com.atguigu.gmall.pojo.PaymentWide;
  8. import com.atguigu.gmall.util.FlinkSinkUtil;
  9. import com.atguigu.gmall.util.IteratorToListUtil;
  10. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  11. import org.apache.flink.api.common.functions.MapFunction;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.datastream.KeyedStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  16. import org.apache.flink.streaming.api.windowing.time.Time;
  17. import org.apache.flink.util.Collector;
  18. import java.time.Duration;
  19. import java.util.HashMap;
  20. public class DwmPaymentWideApp extends BaseAppV2 {
  21. public static void main(String[] args) {
  22. new DwmPaymentWideApp().init(3006,1,"DwmPaymentWideApp",
  23. "DwmPaymentWideApp", Constant.TOPIC_DWD_PAYMENT_INFO,Constant.TOPIC_DWM_ORDER_WIDE);
  24. }
  25. @Override
  26. public void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> streams) {
  27. //streams.get(Constant.TOPIC_DWD_PAYMENT_INFO).print(Constant.TOPIC_DWD_PAYMENT_INFO);
  28. //streams.get(Constant.TOPIC_DWM_ORDER_WIDE).print(Constant.TOPIC_DWM_ORDER_WIDE);
  29. KeyedStream<PaymentInfo, Long> paymentInfoStream = streams.get(Constant.TOPIC_DWD_PAYMENT_INFO)
  30. .map(new MapFunction<String, PaymentInfo>() {
  31. @Override
  32. public PaymentInfo map(String value) throws Exception {
  33. return JSON.parseObject(value, PaymentInfo.class);
  34. }
  35. })
  36. .assignTimestampsAndWatermarks(
  37. WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  38. .withTimestampAssigner((info, ts) -> IteratorToListUtil.toTs(info.getCreate_time()))
  39. )
  40. .keyBy(info -> info.getOrder_id());
  41. KeyedStream<OrderWide, Long> orderWideStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
  42. .map(new MapFunction<String, OrderWide>() {
  43. @Override
  44. public OrderWide map(String value) throws Exception {
  45. return JSON.parseObject(value, OrderWide.class);
  46. }
  47. })
  48. .assignTimestampsAndWatermarks(
  49. WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  50. .withTimestampAssigner((wide, ts) -> IteratorToListUtil.toTs(wide.getCreate_time()))
  51. )
  52. .keyBy(wide -> wide.getOrder_id());
  53. paymentInfoStream
  54. .intervalJoin(orderWideStream)
  55. .between(Time.minutes(-45),Time.minutes(10))
  56. .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
  57. @Override
  58. public void processElement(PaymentInfo left, OrderWide right, Context ctx, Collector<PaymentWide> out) throws Exception {
  59. out.collect(new PaymentWide(left,right));
  60. }
  61. })
  62. .map(new MapFunction<PaymentWide, String>() {
  63. @Override
  64. public String map(PaymentWide value) throws Exception {
  65. return JSON.toJSONString(value);
  66. }
  67. })
  68. .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_PAYMENT_WIDE));
  69. //.print();
  70. }
  71. }

3.将项目打包到集群上进行测试

Flink实时数仓(二) - 图15

发现数据已经成功采集到kafka