第一章.DWS层介绍

DWS层的定位

  1. 轻度聚合,因为ADS层要应对很多实时查询,如果是完全的明细那么查询的压力是比较大的
  2. 将更多的实时数据以主题的方式组合起来便于管理,同时也能减少维度查询的次数

第二章.DWS层:访客主题宽表

需求指标 输出方式 计算来源 来源层级
pv 可视化大屏 page_log直接可求 dwd
uv 可视化大屏 需要用page_log过滤去重 dwm
跳出率 可视化大屏 需要通过跳过明细个page_log行为判断 dwd/dwm
连续访问页面数 可视化大屏 需要识别开始访问标志 dwd
连续访问时长 可视化大屏 需要识别开始访问标志 dwd

设计一张DWS层的表其实就是两件事:维度和度量(事实数据)

  • 度量包括pv,uv,跳出次数,连续访问次数,连续访问时长
  • 维度包括在分析中比较重要的几个字段:渠道,地区,版本,新老用户进行聚合

1.需求分析

  1. 接收各个明细数据,变为数据流
  2. 把数据流合并在一起,成为一个相同格式对象的数据流
  3. 对合并的流进行聚合,聚合的时间窗口决定了数据的时效性
  4. 把聚合结果写入到数据库中(clickhouse)

2.准备工作

  1. 用于封装数据的pojo
  1. package com.atguigu.gmall.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @NoArgsConstructor
  7. @AllArgsConstructor
  8. public class VisitorStats {
  9. //统计开始时间
  10. private String stt;
  11. //统计结束时间
  12. private String edt;
  13. //维度:版本
  14. private String vc;
  15. //维度:渠道
  16. private String ch;
  17. //维度:地区
  18. private String ar;
  19. //维度:新老用户标识
  20. private String is_new;
  21. //度量:独立访客数
  22. private Long uv_ct = 0L;
  23. //度量:页面访问数
  24. private Long pv_ct = 0L;
  25. //度量: 进入次数
  26. private Long sv_ct = 0L;
  27. //度量: 跳出次数
  28. private Long uj_ct = 0L;
  29. //度量: 持续访问时间
  30. private Long dur_sum = 0L;
  31. //统计时间
  32. private Long ts;
  33. }
  1. 在clickHouse中建库建表
  1. create database gmall2021;
  2. use gmall2021;
  3. create table visitor_stats_2021 (
  4. stt DateTime,
  5. edt DateTime,
  6. vc String,
  7. ch String ,
  8. ar String ,
  9. is_new String ,
  10. uv_ct UInt64,
  11. pv_ct UInt64,
  12. sv_ct UInt64,
  13. uj_ct UInt64,
  14. dur_sum UInt64,
  15. ts UInt64
  16. ) engine =ReplacingMergeTree( ts)
  17. partition by toYYYYMMDD(stt)
  18. order by ( stt,edt,is_new,vc,ch,ar);
  1. 加入jdbc-connector和ClickHouse依赖包
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <!--ClickHouse 依赖开始-->
  7. <!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
  8. <dependency>
  9. <groupId>ru.yandex.clickhouse</groupId>
  10. <artifactId>clickhouse-jdbc</artifactId>
  11. <version>0.2.4</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
  14. <dependency>
  15. <groupId>com.fasterxml.jackson.core</groupId>
  16. <artifactId>jackson-core</artifactId>
  17. <version>2.11.2</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.httpcomponents</groupId>
  21. <artifactId>httpclient</artifactId>
  22. <version>4.5</version>
  23. </dependency>
  24. <!--ClickHouse 依赖结束-->

3.具体代码

  1. 解析多个流,把他们union成一个流
  1. /***
  2. * @Description: 解析多个流,把他们union成多个流
  3. * @Param: [streams]
  4. * @return: org.apache.flink.streaming.api.datastream.DataStream<com.atguigu.gmall.pojo.VisitorStats>
  5. * @Author: jcsune
  6. * @Date: 2021/11/2 0002
  7. */
  8. private DataStream<VisitorStats> parseStreamUnionOne(HashMap<String, DataStreamSource<String>> streams) {
  9. //1.获取pv(页面访问)和连续访问时长
  10. SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStream = streams
  11. .get(Constant.TOPIC_DWD_PAGE)
  12. .map(new MapFunction<String, VisitorStats>() {
  13. @Override
  14. public VisitorStats map(String value) throws Exception {
  15. JSONObject object = JSON.parseObject(value);
  16. Long ts = object.getLong("ts");
  17. JSONObject common = object.getJSONObject("common");
  18. String vc = common.getString("vc");
  19. String ch = common.getString("ch");
  20. String ar = common.getString("ar");
  21. String is_new = common.getString("is_new");
  22. Long during_time = object.getJSONObject("page").getLong("during_time");
  23. return new VisitorStats("", "", vc, ch, ar, is_new,
  24. 0L, 1L, 0L, 0L, during_time, ts);
  25. }
  26. });
  27. //2.获取uv(独立访客)
  28. SingleOutputStreamOperator<VisitorStats> uvStream = streams
  29. .get(Constant.TOPIC_DWM_UV)
  30. .map(new MapFunction<String, VisitorStats>() {
  31. @Override
  32. public VisitorStats map(String value) throws Exception {
  33. JSONObject object = JSON.parseObject(value);
  34. Long ts = object.getLong("ts");
  35. JSONObject common = object.getJSONObject("common");
  36. String vc = common.getString("vc");
  37. String ch = common.getString("ch");
  38. String ar = common.getString("ar");
  39. String is_new = common.getString("is_new");
  40. return new VisitorStats("", "", vc, ch, ar, is_new,
  41. 1L, 0L, 0L, 0L, 0L, ts);
  42. }
  43. });
  44. //3.获取uj(跳出行为)
  45. SingleOutputStreamOperator<VisitorStats> ujStream = streams
  46. .get(Constant.TOPIC_DWM_UJ_DETAIL)
  47. .map(new MapFunction<String, VisitorStats>() {
  48. @Override
  49. public VisitorStats map(String value) throws Exception {
  50. JSONObject object = JSON.parseObject(value);
  51. Long ts = object.getLong("ts");
  52. JSONObject common = object.getJSONObject("common");
  53. String vc = common.getString("vc");
  54. String ch = common.getString("ch");
  55. String ar = common.getString("ar");
  56. String is_new = common.getString("is_new");
  57. return new VisitorStats("", "", vc, ch, ar, is_new,
  58. 0L, 0L, 0L, 1L, 0L, ts);
  59. }
  60. });
  61. //4.获取sv(进入次数)
  62. SingleOutputStreamOperator<VisitorStats> svStream = streams
  63. .get(Constant.TOPIC_DWD_PAGE)
  64. .flatMap(new FlatMapFunction<String, VisitorStats>() {
  65. @Override
  66. public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
  67. JSONObject object = JSON.parseObject(value);
  68. String lastPageId = object.getJSONObject("page").getString("last_page_id");
  69. if (lastPageId == null || lastPageId.length() == 0) {
  70. Long ts = object.getLong("ts");
  71. JSONObject common = object.getJSONObject("common");
  72. String vc = common.getString("vc");
  73. String ch = common.getString("ch");
  74. String ar = common.getString("ar");
  75. String is_new = common.getString("is_new");
  76. out.collect(new VisitorStats(
  77. "", "", vc, ch, ar, is_new,
  78. 0L, 0L, 1L, 0L, 0L, ts
  79. ));
  80. }
  81. }
  82. });
  83. return pvAndDuringTimeStream.union(uvStream, ujStream, svStream);
  84. }
  1. 开窗聚合
  1. /**
  2. * @Description: 开窗聚合
  3. * @Param: [visitorStatsDataStream]
  4. * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.VisitorStats>
  5. * @Author: jcsune
  6. * @Date: 2021/11/2 0002
  7. */
  8. private SingleOutputStreamOperator<VisitorStats> aggregate(DataStream<VisitorStats> visitorStatsDataStream) {
  9. return visitorStatsDataStream
  10. .assignTimestampsAndWatermarks(
  11. WatermarkStrategy.<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  12. .withTimestampAssigner(new SerializableTimestampAssigner<VisitorStats>() {
  13. @Override
  14. public long extractTimestamp(VisitorStats element, long recordTimestamp) {
  15. return element.getTs();
  16. }
  17. })
  18. )
  19. .keyBy(new KeySelector<VisitorStats, String>() {
  20. @Override
  21. public String getKey(VisitorStats value) throws Exception {
  22. return value.getVc() + "_" + value.getCh() + "_" + value.getAr() + "_" + value.getIs_new(); }
  23. })
  24. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  25. .sideOutputLateData(new OutputTag<VisitorStats>("late"){})
  26. .reduce(
  27. new ReduceFunction<VisitorStats>() {
  28. @Override
  29. public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception {
  30. value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
  31. value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
  32. value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
  33. value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
  34. value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
  35. return value1;
  36. }
  37. },
  38. new ProcessWindowFunction<VisitorStats, VisitorStats, String, TimeWindow>() {
  39. @Override
  40. public void process(String s,
  41. Context context,
  42. Iterable<VisitorStats> elements,
  43. Collector<VisitorStats> out) throws Exception {
  44. VisitorStats visitorStats = elements.iterator().next();
  45. String stt = IteratorToListUtil.toDateString(context.window().getStart(), "yyyy-MM-dd HH:mm:ss");
  46. String edt = IteratorToListUtil.toDateString(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss");
  47. visitorStats.setStt(stt);
  48. visitorStats.setEdt(edt);
  49. visitorStats.setTs(System.currentTimeMillis());
  50. out.collect(visitorStats);
  51. }
  52. }
  53. );
  54. }
  1. 更新FlinkSinkUtil,添加方法实现往clickhouse中写数据
  1. public static<T> SinkFunction<T> getClickHouseSink(String db,
  2. String table,
  3. Class<T> tClass) {
  4. String driver = Constant.CLICK_HOUSE_DRIVER;
  5. String url = Constant.CLICK_HOUSE_URL_PRE + db;
  6. StringBuilder sql = new StringBuilder();
  7. //拼接SQL语句:T类型的属性名一定要和表的列名一致
  8. sql
  9. .append("insert into ")
  10. .append(table)
  11. .append("(");
  12. //拼接字段名
  13. Field[] fields = tClass.getDeclaredFields();
  14. for (Field field : fields) {
  15. sql.append(field.getName()).append(",");
  16. }
  17. sql.deleteCharAt(sql.length()-1);
  18. sql.append(") values(");
  19. //拼接?
  20. for (Field field : fields) {
  21. sql.append("?,");
  22. }
  23. sql.deleteCharAt(sql.length()-1);
  24. sql.append(")");
  25. //System.out.println(sql.toString());
  26. return getJdbcSink(driver,url,sql.toString());
  27. }
  28. private static <T> SinkFunction<T> getJdbcSink(String driver, String url, String sql) {
  29. return JdbcSink.sink(
  30. sql,
  31. new JdbcStatementBuilder<T>() {
  32. @Override
  33. public void accept(PreparedStatement ps, T t) throws SQLException {
  34. //给占位符赋值
  35. /**
  36. * 获取class的 三种方式
  37. * 1.类名.class
  38. * 2.对象.getClass
  39. * 3.Class.forName("类名")
  40. */
  41. Field[] fields = t.getClass().getDeclaredFields();
  42. try{
  43. for (int i = 0; i < fields.length; i++) {
  44. Field field = fields[i];
  45. field.setAccessible(true);//允许访问私有属性
  46. ps.setObject(i+1,field.get(t));
  47. }
  48. }catch(Exception e){
  49. e.printStackTrace();
  50. }
  51. }
  52. },
  53. new JdbcExecutionOptions.Builder()
  54. .withBatchIntervalMs(1000)
  55. .withBatchSize(1024)
  56. .withMaxRetries(3)
  57. .build(),
  58. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  59. .withDriverName(driver)
  60. .withUrl(url)
  61. .build()
  62. );
  63. }
  1. 把聚合结果写入到clickhouse
  1. /**
  2. * @Description: 把聚合结果写入到clickhouse(dws)
  3. * @Param: [vsAggregateStream]
  4. * @return: void
  5. * @Author: jcsune
  6. * @Date: 2021/11/2 0002
  7. */
  8. private void writeToClickHouse(SingleOutputStreamOperator<VisitorStats> vsAggregateStream) {
  9. vsAggregateStream.addSink(FlinkSinkUtil.getClickHouseSink("gmall2021",
  10. "visitor_stats_2021"
  11. ,VisitorStats.class));
  12. }
  1. 完整代码
  1. package com.atguigu.gmall.app.dws;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.gmall.app.BaseAppV2;
  5. import com.atguigu.gmall.common.Constant;
  6. import com.atguigu.gmall.pojo.VisitorStats;
  7. import com.atguigu.gmall.util.FlinkSinkUtil;
  8. import com.atguigu.gmall.util.IteratorToListUtil;
  9. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  10. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  11. import org.apache.flink.api.common.functions.FlatMapFunction;
  12. import org.apache.flink.api.common.functions.MapFunction;
  13. import org.apache.flink.api.common.functions.ReduceFunction;
  14. import org.apache.flink.api.java.functions.KeySelector;
  15. import org.apache.flink.streaming.api.datastream.DataStream;
  16. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  17. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  18. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  19. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  20. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  21. import org.apache.flink.streaming.api.windowing.time.Time;
  22. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  23. import org.apache.flink.util.Collector;
  24. import org.apache.flink.util.OutputTag;
  25. import java.time.Duration;
  26. import java.util.HashMap;
  27. public class DwsVisitorApp extends BaseAppV2 {
  28. public static void main(String[] args) {
  29. new DwsVisitorApp().init(4001,1,"DwsVisitorApp","DwsVisitorApp",
  30. Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWM_UV,Constant.TOPIC_DWM_UJ_DETAIL);
  31. }
  32. @Override
  33. public void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> streams) {
  34. //streams.get(Constant.TOPIC_DWD_PAGE).print(Constant.TOPIC_DWD_PAGE);
  35. //streams.get(Constant.TOPIC_DWM_UV).print(Constant.TOPIC_DWM_UV);
  36. //streams.get(Constant.TOPIC_DWM_UJ_DETAIL).print(Constant.TOPIC_DWM_UJ_DETAIL);
  37. /**
  38. * 1.解析多个流,把他们Union成一个流
  39. */
  40. DataStream<VisitorStats> visitorStatsDataStream = parseStreamUnionOne(streams);
  41. //visitorStatsDataStream.print();
  42. /**
  43. *2.开窗聚合
  44. */
  45. SingleOutputStreamOperator<VisitorStats> vsAggregateStream = aggregate(visitorStatsDataStream);
  46. //vsAggregateStream.print();
  47. /**
  48. * 3.把聚合结果写入到dws(clickhouse)
  49. * sink 用jdbc封装一个clickhouse sink
  50. */
  51. writeToClickHouse(vsAggregateStream);
  52. }
  53. /**
  54. * @Description: 把聚合结果写入到clickhouse(dws)
  55. * @Param: [vsAggregateStream]
  56. * @return: void
  57. * @Author: jcsune
  58. * @Date: 2021/11/2 0002
  59. */
  60. private void writeToClickHouse(SingleOutputStreamOperator<VisitorStats> vsAggregateStream) {
  61. vsAggregateStream.addSink(FlinkSinkUtil.getClickHouseSink("gmall2021",
  62. "visitor_stats_2021"
  63. ,VisitorStats.class));
  64. }
  65. /**
  66. * @Description: 开窗聚合
  67. * @Param: [visitorStatsDataStream]
  68. * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.VisitorStats>
  69. * @Author: jcsune
  70. * @Date: 2021/11/2 0002
  71. */
  72. private SingleOutputStreamOperator<VisitorStats> aggregate(DataStream<VisitorStats> visitorStatsDataStream) {
  73. return visitorStatsDataStream
  74. .assignTimestampsAndWatermarks(
  75. WatermarkStrategy.<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  76. .withTimestampAssigner(new SerializableTimestampAssigner<VisitorStats>() {
  77. @Override
  78. public long extractTimestamp(VisitorStats element, long recordTimestamp) {
  79. return element.getTs();
  80. }
  81. })
  82. )
  83. .keyBy(new KeySelector<VisitorStats, String>() {
  84. @Override
  85. public String getKey(VisitorStats value) throws Exception {
  86. return value.getVc() + "_" + value.getCh() + "_" + value.getAr() + "_" + value.getIs_new(); }
  87. })
  88. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  89. .sideOutputLateData(new OutputTag<VisitorStats>("late"){})
  90. .reduce(
  91. new ReduceFunction<VisitorStats>() {
  92. @Override
  93. public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception {
  94. value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
  95. value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
  96. value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
  97. value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
  98. value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
  99. return value1;
  100. }
  101. },
  102. new ProcessWindowFunction<VisitorStats, VisitorStats, String, TimeWindow>() {
  103. @Override
  104. public void process(String s,
  105. Context context,
  106. Iterable<VisitorStats> elements,
  107. Collector<VisitorStats> out) throws Exception {
  108. VisitorStats visitorStats = elements.iterator().next();
  109. String stt = IteratorToListUtil.toDateString(context.window().getStart(), "yyyy-MM-dd HH:mm:ss");
  110. String edt = IteratorToListUtil.toDateString(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss");
  111. visitorStats.setStt(stt);
  112. visitorStats.setEdt(edt);
  113. visitorStats.setTs(System.currentTimeMillis());
  114. out.collect(visitorStats);
  115. }
  116. }
  117. );
  118. }
  119. /***
  120. * @Description: 解析多个流,把他们union成多个流
  121. * @Param: [streams]
  122. * @return: org.apache.flink.streaming.api.datastream.DataStream<com.atguigu.gmall.pojo.VisitorStats>
  123. * @Author: jcsune
  124. * @Date: 2021/11/2 0002
  125. */
  126. private DataStream<VisitorStats> parseStreamUnionOne(HashMap<String, DataStreamSource<String>> streams) {
  127. //1.获取pv(页面访问)和连续访问时长
  128. SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStream = streams
  129. .get(Constant.TOPIC_DWD_PAGE)
  130. .map(new MapFunction<String, VisitorStats>() {
  131. @Override
  132. public VisitorStats map(String value) throws Exception {
  133. JSONObject object = JSON.parseObject(value);
  134. Long ts = object.getLong("ts");
  135. JSONObject common = object.getJSONObject("common");
  136. String vc = common.getString("vc");
  137. String ch = common.getString("ch");
  138. String ar = common.getString("ar");
  139. String is_new = common.getString("is_new");
  140. Long during_time = object.getJSONObject("page").getLong("during_time");
  141. return new VisitorStats("", "", vc, ch, ar, is_new,
  142. 0L, 1L, 0L, 0L, during_time, ts);
  143. }
  144. });
  145. //2.获取uv(独立访客)
  146. SingleOutputStreamOperator<VisitorStats> uvStream = streams
  147. .get(Constant.TOPIC_DWM_UV)
  148. .map(new MapFunction<String, VisitorStats>() {
  149. @Override
  150. public VisitorStats map(String value) throws Exception {
  151. JSONObject object = JSON.parseObject(value);
  152. Long ts = object.getLong("ts");
  153. JSONObject common = object.getJSONObject("common");
  154. String vc = common.getString("vc");
  155. String ch = common.getString("ch");
  156. String ar = common.getString("ar");
  157. String is_new = common.getString("is_new");
  158. return new VisitorStats("", "", vc, ch, ar, is_new,
  159. 1L, 0L, 0L, 0L, 0L, ts);
  160. }
  161. });
  162. //3.获取uj(跳出行为)
  163. SingleOutputStreamOperator<VisitorStats> ujStream = streams
  164. .get(Constant.TOPIC_DWM_UJ_DETAIL)
  165. .map(new MapFunction<String, VisitorStats>() {
  166. @Override
  167. public VisitorStats map(String value) throws Exception {
  168. JSONObject object = JSON.parseObject(value);
  169. Long ts = object.getLong("ts");
  170. JSONObject common = object.getJSONObject("common");
  171. String vc = common.getString("vc");
  172. String ch = common.getString("ch");
  173. String ar = common.getString("ar");
  174. String is_new = common.getString("is_new");
  175. return new VisitorStats("", "", vc, ch, ar, is_new,
  176. 0L, 0L, 0L, 1L, 0L, ts);
  177. }
  178. });
  179. //4.获取sv(进入次数)
  180. SingleOutputStreamOperator<VisitorStats> svStream = streams
  181. .get(Constant.TOPIC_DWD_PAGE)
  182. .flatMap(new FlatMapFunction<String, VisitorStats>() {
  183. @Override
  184. public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
  185. JSONObject object = JSON.parseObject(value);
  186. String lastPageId = object.getJSONObject("page").getString("last_page_id");
  187. if (lastPageId == null || lastPageId.length() == 0) {
  188. Long ts = object.getLong("ts");
  189. JSONObject common = object.getJSONObject("common");
  190. String vc = common.getString("vc");
  191. String ch = common.getString("ch");
  192. String ar = common.getString("ar");
  193. String is_new = common.getString("is_new");
  194. out.collect(new VisitorStats(
  195. "", "", vc, ch, ar, is_new,
  196. 0L, 0L, 1L, 0L, 0L, ts
  197. ));
  198. }
  199. }
  200. });
  201. return pvAndDuringTimeStream.union(uvStream, ujStream, svStream);
  202. }
  203. }

4.测试

项目打包上传到linux服务器进行测试

使用datagrip连接clickhouse,启动日志数据生成程序,观察clickhouse中是否已经被写入数据即可

Flink实时数仓(三) - 图1

第三章.DWS层:商品主题宽表

需求指标 输出方式 计算来源 来源层级
点击 多维分析 dwd_page直接可求 dwd
曝光 多维分析 dwd_display直接可求 dwd
收藏 多维分析 收藏表 dwd
加入购物车 多维分析 购物车表 dwd
下单 可视化大屏 订单宽表 dwm
支付 多维分析 支付宽表 dwm
退款 多维分析 退款表 dwd
评价 多维分析 评价表 dwd

与访客的dws层的宽表类似,也是把多个事实表的明细数据汇总起来组合成宽表

1.需求分析

  1. 从kafka主题中获取数据流
  2. 把Json字符串数据流转换为统一数据对象的数据流
  3. 把统一的数据结构流合并为一个流
  4. 设定事件时间与水位线
  5. 分组,开窗,聚合
  6. 写入ClickHouse

2.准备工作

  1. 自定义注解
package com.atguigu.gmall.app.annotatiton;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NotSink {
}
  1. 用于封装数据的pojo
package com.atguigu.gmall.pojo;


import com.atguigu.gmall.app.annotatiton.NotSink;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.util.HashSet;
import java.util.Set;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductStats {

    private String stt;//窗口起始时间
    private String edt;  //窗口结束时间
    private Long sku_id; //sku编号
    private String sku_name;//sku名称
    private BigDecimal sku_price; //sku单价
    private Long spu_id; //spu编号
    private String spu_name;//spu名称
    private Long tm_id; //品牌编号
    private String tm_name;//品牌名称
    private Long category3_id;//品类编号
    private String category3_name;//品类名称

    private Long display_ct = 0L; //曝光数

    private Long click_ct = 0L;  //点击数

    private Long favor_ct = 0L; //收藏数

    private Long cart_ct = 0L;  //添加购物车数

    private Long order_sku_num = 0L; //下单商品个数

    //下单商品金额  不是整个订单的金额
    private BigDecimal order_amount = BigDecimal.ZERO;

    private Long order_ct = 0L; //订单数

    //支付金额
    private BigDecimal payment_amount = BigDecimal.ZERO;

    private Long paid_order_ct = 0L;  //支付订单数

    private Long refund_order_ct = 0L; //退款订单数

    private BigDecimal refund_amount = BigDecimal.ZERO;

    private Long comment_ct = 0L;//评论订单数

    private Long good_comment_ct = 0L; //好评订单数

    @NotSink
    private Set<Long> orderIdSet = new HashSet<>();  //用于统计订单数  存储的是订单id
    @NotSink
    private Set<Long> paidOrderIdSet = new HashSet<>(); //用于统计支付订单数
    @NotSink
    private Set<Long> refundOrderIdSet = new HashSet<>();//用于退款支付订单数
    private Long ts; //统计时间戳


}
  1. 在clickhouse中建库建表
use gmall2021;
create table product_stats_2021 (
   stt DateTime,
   edt DateTime,
   sku_id  UInt64,
   sku_name String,
   sku_price Decimal64(2),
   spu_id UInt64,
   spu_name String ,
   tm_id UInt64,
   tm_name String,
   category3_id UInt64,
   category3_name String ,
   display_ct UInt64,
   click_ct UInt64,
   favor_ct UInt64,
   cart_ct UInt64,
   order_sku_num UInt64,
   order_amount Decimal64(2),
   order_ct UInt64 ,
   payment_amount Decimal64(2),
   paid_order_ct UInt64,
   refund_order_ct UInt64,
   refund_amount Decimal64(2),
   comment_ct UInt64,
   good_comment_ct UInt64 ,
   ts UInt64
)engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by   (stt,edt,sku_id );

3.具体代码

  1. 解析8个流把他们union成一个流
/**
    * @Description: 解析8个流,合并为一个
    * @Param: [streams]
    * @return: org.apache.flink.streaming.api.datastream.DataStream<com.atguigu.gmall.pojo.ProductStats>
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private DataStream<ProductStats> parseStreamAndUnionOne(HashMap<String, DataStreamSource<String>> streams) {
        //1.点击
        SingleOutputStreamOperator<ProductStats> clickStream = streams
                .get(Constant.TOPIC_DWD_PAGE)
                .flatMap(new FlatMapFunction<String, ProductStats>() {
                    @Override
                    public void flatMap(String value, Collector<ProductStats> out) throws Exception {
                        JSONObject object = JSON.parseObject(value);
                        JSONObject page = object.getJSONObject("page");
                        String page_id = page.getString("page_id");
                        String item_type = page.getString("item_type");
                        if ("good_detail".equals(page_id) && "sku_id".equals(item_type)) {
                            Long skuId = page.getLong("item");
                            ProductStats ps = new ProductStats();
                            ps.setSku_id(skuId);
                            ps.setClick_ct(1L);
                            ps.setTs(object.getLong("ts"));
                            out.collect(ps);
                        }
                    }
                });
        //2.曝光
        SingleOutputStreamOperator<ProductStats> displayStream = streams
                .get(Constant.TOPIC_DWD_DISPLAY)
                .filter(value -> {
                    JSONObject object = JSON.parseObject(value);
                    //把item_type是sku_id的数据留下
                    String item_type = object.getString("item_type");
                    return "sku_id".equals(item_type);
                })
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("item"));
                    ps.setDisplay_ct(1L);
                    ps.setTs(object.getLong("ts"));
                    return ps;

                });

        //3.收藏
        SingleOutputStreamOperator<ProductStats> favorStream = streams
                .get(Constant.TOPIC_DWD_FAVOR_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setFavor_ct(1L);
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;

                });

        //4.购物车
        SingleOutputStreamOperator<ProductStats> cartStream = streams
                .get(Constant.TOPIC_DWD_CART_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setCart_ct(1L);
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;

                });

        //5.订单
        SingleOutputStreamOperator<ProductStats> orderStream = streams
                .get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(json -> {
                    OrderWide orderWide = JSON.parseObject(json, OrderWide.class);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(orderWide.getSku_id());
                    ps.getOrderIdSet().add(orderWide.getOrder_id());//这个集合只用一个,获取之后直接向集合中添加id
                    ps.setOrder_amount(orderWide.getSplit_total_amount());//使用商品的分摊金额
                    ps.setOrder_sku_num(orderWide.getSku_num());
                    ps.setTs(IteratorToListUtil.toTs(orderWide.getCreate_time()));
                    return ps;


                });

        //6.支付
        SingleOutputStreamOperator<ProductStats> paymentStream = streams
                .get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                .map(json -> {
                    PaymentWide paymentWide = JSON.parseObject(json, PaymentWide.class);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(paymentWide.getSku_id());
                    ps.setPayment_amount(paymentWide.getSplit_total_amount());
                    ps.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                    ps.setTs(IteratorToListUtil.toTs(paymentWide.getPayment_create_time()));
                    return ps;
                });

        //7.退款
        SingleOutputStreamOperator<ProductStats> refundStream = streams
                .get(Constant.TOPIC_DWD_REFUND_PAYMENT)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setRefund_amount(object.getBigDecimal("total_amount"));
                    ps.getRefundOrderIdSet().add(object.getLong("order_id"));
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;
                });
        //8.评价
        SingleOutputStreamOperator<ProductStats> commentStream = streams
                .get(Constant.TOPIC_DWD_COMMENT_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setComment_ct(1L);
                    String appraise = object.getString("appraise");
                    if (Constant.FIVE_STAR_COMMENT.equals(appraise) || Constant.FOUR_STAR_COMMENT.equals(appraise)) {
                        ps.setGood_comment_ct(1L);
                    }
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;
                });


        return clickStream.union(displayStream,
                favorStream,
                cartStream,
                orderStream,
                paymentStream,
                refundStream,
                commentStream);


    }
  1. 开窗聚合
 /** 
    * @Description: 开窗聚合 
    * @Param: [psStream] 
    * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.ProductStats> 
    * @Author: jcsune
    * @Date: 2021/11/3 0003 
    */ 
    private SingleOutputStreamOperator<ProductStats> aggregate(DataStream<ProductStats> psStream) {
        return psStream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((ps,ts)->(ps.getTs()))

                )
                .keyBy(ps->ps.getSku_id())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(
                        new ReduceFunction<ProductStats>() {
                            @Override
                            public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {
                                value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());
                                value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));
                                value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));
                                value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());
                                return value1;
                            }
                        },
                        new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                            @Override
                            public void process(Long key, Context context, Iterable<ProductStats> elements, Collector<ProductStats> out) throws Exception {
                                ProductStats ps = elements.iterator().next();
                                ps.setStt(IteratorToListUtil.toDateString(context.window().getStart(),"yyyy-MM-dd HH:mm:ss"));
                                ps.setEdt(IteratorToListUtil.toDateString(context.window().getEnd(),"yyyy-MM-dd HH:mm:ss"));
                                ps.setTs(System.currentTimeMillis());

                                ps.setOrder_ct((long)ps.getOrderIdSet().size());
                                ps.setPaid_order_ct((long)ps.getPaidOrderIdSet().size());
                                ps.setRefund_order_ct((long)ps.getRefundOrderIdSet().size());
                                out.collect(ps);
                            }
                        }
                );

    }
  1. join维度信息
 /**
    * @Description: join 维度信息
    * @Param: [aggregateStream]
    * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.ProductStats>
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private SingleOutputStreamOperator<ProductStats> joinDims(SingleOutputStreamOperator<ProductStats> aggregateStream) {
        return AsyncDataStream.unorderedWait(
                aggregateStream,
                new DimAsyncFunction<ProductStats>(){
                    @Override
                    protected void addDim(Connection phoenixConnection,
                                          Jedis redisClient,
                                          ProductStats ps,
                                          ResultFuture<ProductStats> resultFuture) {
                        // 3. sku
                        JSONObject skuInfo = DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_sku_info",
                                ps.getSku_id());
                        ps.setSku_name(skuInfo.getString("SKU_NAME"));
                        ps.setSku_price(skuInfo.getBigDecimal("PRICE"));

                        ps.setSpu_id(skuInfo.getLong("SPU_ID"));
                        ps.setTm_id(skuInfo.getLong("TM_ID"));
                        ps.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                        // 4. spu
                        JSONObject spuInfo = DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_spu_info",
                                ps.getSpu_id());
                        ps.setSpu_name(spuInfo.getString("SPU_NAME"));


                        // 5. tm
                        JSONObject tmInfo =DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_base_trademark",
                                ps.getTm_id());
                        ps.setTm_name(tmInfo.getString("TM_NAME"));

                        // 5. tm
                        JSONObject c3Info =DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_base_category3",
                                ps.getCategory3_id());
                        ps.setCategory3_name(c3Info.getString("NAME"));
                        resultFuture.complete(Collections.singletonList(ps));

                    }
                },
                60,
                TimeUnit.SECONDS
        );
    }
  1. 升级Phoenix sink,加入注解
public static<T> SinkFunction<T> getClickHouseSink(String db,
                                                       String table,
                                                       Class<T> tClass) {
        String driver = Constant.CLICK_HOUSE_DRIVER;
        String url = Constant.CLICK_HOUSE_URL_PRE + db;
        StringBuilder sql = new StringBuilder();
        //拼接SQL语句:T类型的属性名一定要和表的列名一致
        sql
                .append("insert into ")
                .append(table)
                .append("(");
        //拼接字段名
        Field[] fields = tClass.getDeclaredFields();
        for (Field field : fields) {
            //如果这个属性有注解就不执行
            NotSink notSink = field.getAnnotation(NotSink.class);
            if(notSink == null){//如果为空,表示不存在这个注解,需要将这个字段拼接到SQL中
                sql.append(field.getName()).append(",");
            }
        }
        sql.deleteCharAt(sql.length()-1);
        sql.append(") values(");
        //拼接?
        for (Field field : fields) {
            NotSink notSink = field.getAnnotation(NotSink.class);
            if (notSink == null){//如果为空,表示不存在这个注解,需要将这个字段拼接到SQL中
                sql.append("?,");
            }

        }
        sql.deleteCharAt(sql.length()-1);
        sql.append(")");
        //System.out.println(sql.toString());

        return getJdbcSink(driver,url,sql.toString());

    }

    private static <T> SinkFunction<T> getJdbcSink(String driver, String url, String sql) {
        return JdbcSink.sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement ps, T t) throws SQLException {
                        //给占位符赋值
                        /**
                         * 获取class的 三种方式
                         * 1.类名.class
                         * 2.对象.getClass
                         * 3.Class.forName("类名")
                         */
                        Field[] fields = t.getClass().getDeclaredFields();
                        try{
                            for (int i = 0,position = 1; i < fields.length; i++) {
                                Field field = fields[i];
                                NotSink notSink = field.getAnnotation(NotSink.class);
                                if(notSink == null){
                                    field.setAccessible(true);//允许访问私有属性
                                    ps.setObject(position,field.get(t));
                                    position++;
                                }


                            }

                        }catch(Exception e){
                            e.printStackTrace();

                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                .withBatchIntervalMs(1000)
                .withBatchSize(1024)
                .withMaxRetries(3)
                .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withDriverName(driver)
                .withUrl(url)
                .build()

        );
    }
  1. 写入到clickhouse
 /**
    * @Description: 写入到clickhouse中
    * @Param: [psStreamWithDim]
    * @return: void
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private void writeToClickHouse(SingleOutputStreamOperator<ProductStats> psStreamWithDim) {
        psStreamWithDim
                .addSink(FlinkSinkUtil.getClickHouseSink("gmall2021","product_stats_2021",ProductStats.class));
  1. 完整代码
package com.atguigu.gmall.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.app.BaseAppV2;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.function.DimAsyncFunction;
import com.atguigu.gmall.pojo.OrderWide;
import com.atguigu.gmall.pojo.PaymentWide;
import com.atguigu.gmall.pojo.ProductStats;
import com.atguigu.gmall.util.DimUtil;
import com.atguigu.gmall.util.FlinkSinkUtil;
import com.atguigu.gmall.util.IteratorToListUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class DwsProductApp extends BaseAppV2 {
    public static void main(String[] args) {
        new DwsProductApp().init(4002,1,"DwsProductApp","DwsProductApp",
                Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWD_DISPLAY,
                Constant.TOPIC_DWD_FAVOR_INFO,Constant.TOPIC_DWD_CART_INFO,
                Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                Constant.TOPIC_DWD_REFUND_PAYMENT,Constant.TOPIC_DWD_COMMENT_INFO);
    }
    @Override
    public void run(StreamExecutionEnvironment env, HashMap<String, DataStreamSource<String>> streams) {

        /**
         * 1.解析8个流,把他们Union成一个流
         */
        DataStream<ProductStats> psStream = parseStreamAndUnionOne(streams);
        //psStream.print();
        /**
         * 2.开窗聚合
         */
        SingleOutputStreamOperator<ProductStats> aggregateStream = aggregate(psStream);
        //aggregateStream.print();
        /**
         * 3.join维度信息
         */
        SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDims(aggregateStream);
        //psStreamWithDim.print();
        /**
         * 4.写入到clickHouse中
         */
        writeToClickHouse(psStreamWithDim);


    }
    /**
    * @Description: 写入到clickhouse中
    * @Param: [psStreamWithDim]
    * @return: void
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private void writeToClickHouse(SingleOutputStreamOperator<ProductStats> psStreamWithDim) {
        psStreamWithDim
                .addSink(FlinkSinkUtil.getClickHouseSink("gmall2021","product_stats_2021",ProductStats.class));
    }
    /**
    * @Description: join 维度信息
    * @Param: [aggregateStream]
    * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.ProductStats>
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private SingleOutputStreamOperator<ProductStats> joinDims(SingleOutputStreamOperator<ProductStats> aggregateStream) {
        return AsyncDataStream.unorderedWait(
                aggregateStream,
                new DimAsyncFunction<ProductStats>(){
                    @Override
                    protected void addDim(Connection phoenixConnection,
                                          Jedis redisClient,
                                          ProductStats ps,
                                          ResultFuture<ProductStats> resultFuture) {
                        // 3. sku
                        JSONObject skuInfo = DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_sku_info",
                                ps.getSku_id());
                        ps.setSku_name(skuInfo.getString("SKU_NAME"));
                        ps.setSku_price(skuInfo.getBigDecimal("PRICE"));

                        ps.setSpu_id(skuInfo.getLong("SPU_ID"));
                        ps.setTm_id(skuInfo.getLong("TM_ID"));
                        ps.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                        // 4. spu
                        JSONObject spuInfo = DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_spu_info",
                                ps.getSpu_id());
                        ps.setSpu_name(spuInfo.getString("SPU_NAME"));


                        // 5. tm
                        JSONObject tmInfo =DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_base_trademark",
                                ps.getTm_id());
                        ps.setTm_name(tmInfo.getString("TM_NAME"));

                        // 5. tm
                        JSONObject c3Info =DimUtil.getDim(
                                phoenixConnection,
                                redisClient,
                                "dim_base_category3",
                                ps.getCategory3_id());
                        ps.setCategory3_name(c3Info.getString("NAME"));
                        resultFuture.complete(Collections.singletonList(ps));

                    }
                },
                60,
                TimeUnit.SECONDS
        );
    }

    /** 
    * @Description: 开窗聚合 
    * @Param: [psStream] 
    * @return: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<com.atguigu.gmall.pojo.ProductStats> 
    * @Author: jcsune
    * @Date: 2021/11/3 0003 
    */ 
    private SingleOutputStreamOperator<ProductStats> aggregate(DataStream<ProductStats> psStream) {
        return psStream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((ps,ts)->(ps.getTs()))

                )
                .keyBy(ps->ps.getSku_id())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(
                        new ReduceFunction<ProductStats>() {
                            @Override
                            public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {
                                value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());
                                value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));
                                value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));
                                value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());
                                return value1;
                            }
                        },
                        new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                            @Override
                            public void process(Long key, Context context, Iterable<ProductStats> elements, Collector<ProductStats> out) throws Exception {
                                ProductStats ps = elements.iterator().next();
                                ps.setStt(IteratorToListUtil.toDateString(context.window().getStart(),"yyyy-MM-dd HH:mm:ss"));
                                ps.setEdt(IteratorToListUtil.toDateString(context.window().getEnd(),"yyyy-MM-dd HH:mm:ss"));
                                ps.setTs(System.currentTimeMillis());

                                ps.setOrder_ct((long)ps.getOrderIdSet().size());
                                ps.setPaid_order_ct((long)ps.getPaidOrderIdSet().size());
                                ps.setRefund_order_ct((long)ps.getRefundOrderIdSet().size());
                                out.collect(ps);
                            }
                        }
                );

    }

    /**
    * @Description: 解析8个流,合并为一个
    * @Param: [streams]
    * @return: org.apache.flink.streaming.api.datastream.DataStream<com.atguigu.gmall.pojo.ProductStats>
    * @Author: jcsune
    * @Date: 2021/11/3 0003
    */
    private DataStream<ProductStats> parseStreamAndUnionOne(HashMap<String, DataStreamSource<String>> streams) {
        //1.点击
        SingleOutputStreamOperator<ProductStats> clickStream = streams
                .get(Constant.TOPIC_DWD_PAGE)
                .flatMap(new FlatMapFunction<String, ProductStats>() {
                    @Override
                    public void flatMap(String value, Collector<ProductStats> out) throws Exception {
                        JSONObject object = JSON.parseObject(value);
                        JSONObject page = object.getJSONObject("page");
                        String page_id = page.getString("page_id");
                        String item_type = page.getString("item_type");
                        if ("good_detail".equals(page_id) && "sku_id".equals(item_type)) {
                            Long skuId = page.getLong("item");
                            ProductStats ps = new ProductStats();
                            ps.setSku_id(skuId);
                            ps.setClick_ct(1L);
                            ps.setTs(object.getLong("ts"));
                            out.collect(ps);
                        }
                    }
                });
        //2.曝光
        SingleOutputStreamOperator<ProductStats> displayStream = streams
                .get(Constant.TOPIC_DWD_DISPLAY)
                .filter(value -> {
                    JSONObject object = JSON.parseObject(value);
                    //把item_type是sku_id的数据留下
                    String item_type = object.getString("item_type");
                    return "sku_id".equals(item_type);
                })
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("item"));
                    ps.setDisplay_ct(1L);
                    ps.setTs(object.getLong("ts"));
                    return ps;

                });

        //3.收藏
        SingleOutputStreamOperator<ProductStats> favorStream = streams
                .get(Constant.TOPIC_DWD_FAVOR_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setFavor_ct(1L);
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;

                });

        //4.购物车
        SingleOutputStreamOperator<ProductStats> cartStream = streams
                .get(Constant.TOPIC_DWD_CART_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setCart_ct(1L);
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;

                });

        //5.订单
        SingleOutputStreamOperator<ProductStats> orderStream = streams
                .get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(json -> {
                    OrderWide orderWide = JSON.parseObject(json, OrderWide.class);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(orderWide.getSku_id());
                    ps.getOrderIdSet().add(orderWide.getOrder_id());//这个集合只用一个,获取之后直接向集合中添加id
                    ps.setOrder_amount(orderWide.getSplit_total_amount());//使用商品的分摊金额
                    ps.setOrder_sku_num(orderWide.getSku_num());
                    ps.setTs(IteratorToListUtil.toTs(orderWide.getCreate_time()));
                    return ps;


                });

        //6.支付
        SingleOutputStreamOperator<ProductStats> paymentStream = streams
                .get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                .map(json -> {
                    PaymentWide paymentWide = JSON.parseObject(json, PaymentWide.class);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(paymentWide.getSku_id());
                    ps.setPayment_amount(paymentWide.getSplit_total_amount());
                    ps.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                    ps.setTs(IteratorToListUtil.toTs(paymentWide.getPayment_create_time()));
                    return ps;
                });

        //7.退款
        SingleOutputStreamOperator<ProductStats> refundStream = streams
                .get(Constant.TOPIC_DWD_REFUND_PAYMENT)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setRefund_amount(object.getBigDecimal("total_amount"));
                    ps.getRefundOrderIdSet().add(object.getLong("order_id"));
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;
                });
        //8.评价
        SingleOutputStreamOperator<ProductStats> commentStream = streams
                .get(Constant.TOPIC_DWD_COMMENT_INFO)
                .map(value -> {
                    JSONObject object = JSON.parseObject(value);
                    ProductStats ps = new ProductStats();
                    ps.setSku_id(object.getLong("sku_id"));
                    ps.setComment_ct(1L);
                    String appraise = object.getString("appraise");
                    if (Constant.FIVE_STAR_COMMENT.equals(appraise) || Constant.FOUR_STAR_COMMENT.equals(appraise)) {
                        ps.setGood_comment_ct(1L);
                    }
                    ps.setTs(IteratorToListUtil.toTs(object.getString("create_time")));
                    return ps;
                });


        return clickStream.union(displayStream,
                favorStream,
                cartStream,
                orderStream,
                paymentStream,
                refundStream,
                commentStream);


    }


}

4. 测试

将项目打包上传到linux服务器上

使用datagrip连接clickhouse,启动日志数据生成程序和业务数据生成程序,观察clickhouse中是否已经被写入数据即可

Flink实时数仓(三) - 图2

第四章.DWS层:地区主题宽表

之前所有的操作都是使用StreamApi来完成的,这次使用FlinkSql来实现

1.需求分析

  1. 定义Table流环境
  2. 把数据源定义为动态表(数据源为dwm_order_wide)
  3. 通过Sql查询出结果表
  4. 把结果表的数据写入到ClickHouse中

2.准备工作

  1. 用于封装数据的javabean
package com.atguigu.gmall.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProvinceStats {

    private String stt;//窗口起始时间
    private String edt;  //窗口结束时间
    private Long province_id;
    private String province_name;
    private String area_code;
    private String iso_code;
    private String iso_3166_2;
    private BigDecimal order_amount;
    private Long order_count;
    private Long ts; //统计时间戳

}
  1. 在clickHouse中建库建表
use gmall2021;
create table province_stats_2021 (
   stt DateTime,
   edt DateTime,
   province_id  UInt64,
   province_name String,
   area_code String ,
   iso_code String,
   iso_3166_2 String , 
   order_amount Decimal64(2),
   order_count UInt64, 
   ts UInt64
)engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by   (stt,edt,province_id );
  1. 封装一个处理表的执行环境的BaseSqlAPP
package com.atguigu.gmall.app;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



public abstract class BaseSqlApp {
    public abstract void run(StreamTableEnvironment tEnv);


    public void init(int port,int p,String ck){
        System.setProperty("HADOOP_USER_NAME","atguigu");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",port);//设置web端端口号
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(p);
        //配置checkpoint
        env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/gmall/" + ck);
        env.getCheckpointConfig().setCheckpointTimeout(10 * 1000);
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //不同的APP有不同的业务逻辑
        run(tEnv);
        try {
            env.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.具体代码

package com.atguigu.gmall.app.dws;

import com.atguigu.gmall.app.BaseSqlApp;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.pojo.ProvinceStats;
import com.atguigu.gmall.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwsProvinceApp extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsProvinceApp().init(4003,1,"DwsProvinceApp");

    }
    @Override
    public void run(StreamTableEnvironment tEnv) {
        /**
         *  1.使用ddl建表与topic进行关联: dwm_order_wide
         */
        tEnv.executeSql("create table ow(" +
                " province_id bigint, " +
                " province_name string, " +
                " province_area_code string, " +
                " province_iso_code string, " +
                " province_3166_2_code string, " +
                " order_id bigint, " +
                " split_total_amount decimal(20,2), " +
                " create_time string, " +
                " et as to_timestamp(create_time), " +
                " watermark for et as et - interval '3' second " +
                ")with(" +
                " 'connector'='kafka', " +
                " 'properties.bootstrap.servers'='hadoop162:9092,hadoop163:9092,hadoop164:9092', " +
                " 'properties.group.id'='DwsProvinceApp', " +
                " 'topic'='" + Constant.TOPIC_DWM_ORDER_WIDE + "', " +
                " 'scan.startup.mode'='earliest-offset', " +
                " 'format'='json' " +
                ")"


        );
        //tEnv.sqlQuery("select * from ow").execute().print();

        /**
         * 2.执行SQL查询,开窗聚合
         */
        Table result = tEnv.sqlQuery("select" +
                " date_format(tumble_start(et, interval '5' second), 'yyyy-MM-dd HH:mm:ss') stt, " +
                " date_format(tumble_end(et, interval '5' second), 'yyyy-MM-dd HH:mm:ss') edt, " +
                " province_id, " +
                " province_name, " +
                " province_area_code area_code, " +
                " province_iso_code iso_code, " +
                " province_3166_2_code iso_3166_2, " +
                " sum(split_total_amount) order_amount, " +
                " count(distinct(order_id)) order_count, " +
                " unix_timestamp() * 1000 ts " +
                "from ow " +
                "group by " +
                " province_id, " +
                " province_name, " +
                " province_area_code, " +
                " province_iso_code, " +
                " province_3166_2_code, " +
                " tumble(et, interval '5' second) " +
                ""


        );
        //result.execute().print();
        //3.把结果转成流,写入到clickhouse中
        tEnv
                .toRetractStream(result, ProvinceStats.class)
                .filter(t -> t.f0)
                .map(t -> t.f1)
                .addSink(FlinkSinkUtil.getClickHouseSink("gmall2021","province_stats_2021",ProvinceStats.class));

    }
}

4.测试

将项目打包上传到linux服务器上

使用datagrip连接clickhouse,启动业务数据生成程序,观察clickhouse中是否已经被写入数据即可

Flink实时数仓(三) - 图3

第五章.DWS层搜索关键词主题宽表

1.需求分析

关键词这个主题主要是为了大屏展示中词云的展示效果,用于感性的让大屏观看者感知目前的用户更关心的哪些商品和关键词

关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小

关键词的第一重要来源就是用户在搜索栏的搜索,另外就是以商品为主题的统计中获取关键词

    关于分词
    因为无论是从用户的搜索栏中,还是从商品名称中文字可能是比较长的,且由多个关键词组成
    所以我们需要把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到,对于中文分词,现在的搜索引擎基上都是使用的第三方分词器,在计算数据中也可以使用和搜索引擎中一致的分词器,IK

2.准备工作

  1. 导入IK分词器依赖
<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
    <version>2012_u6</version>
</dependency>
  1. 封装分词工具类(分词器)
package com.atguigu.gmall.util;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Set;

public class IkUtil {
    public static Set<String> split(String keyword){
        HashSet<String> words = new HashSet<>();
        //使用内存流
        StringReader reader = new StringReader(keyword);
        IKSegmenter seg = new IKSegmenter(reader, true);
        try {
            Lexeme next = seg.next();
            while(next!=null){
                String word = next.getLexemeText();
                words.add(word);
                next = seg.next();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return words;
    }

    public static void main(String[] args) {
        System.out.println(split("我是中国人"));
    }

}
  1. 自定义函数

若想在FlinkSQL中使用分词器,则需要自己定义一个自定义函数

package com.atguigu.gmall.function;

import com.atguigu.gmall.util.IkUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Set;
//表示输出数据的类型
@FunctionHint(output = @DataTypeHint("row<word string>"))
public class IkAnalyzer extends TableFunction<Row> {
    public void eval(String keyword){
        //把keyword切开
        Set<String> words = IkUtil.split(keyword);
        for (String word : words) {
            collect(Row.of(word));
        }
    }
}
  1. 用于封装数据的pojo
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class KeyWordStats {

    private String stt;//窗口起始时间
    private String edt;  //窗口结束时间
    private String keyword;
    private String source;
    private Long ct;

    private Long ts; //统计时间戳

}
  1. 在clickhouse中建库建表
use gmall2021;
create table keyword_stats_2021 (
    stt DateTime,
    edt DateTime,
    keyword String ,
    source String ,
    ct UInt64 ,
    ts UInt64
)engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by  ( stt,edt,keyword,source );

3.具体代码

package com.atguigu.gmall.app.dws;

import com.atguigu.gmall.app.BaseSqlApp;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.function.IkAnalyzer;
import com.atguigu.gmall.pojo.KeyWordStats;
import com.atguigu.gmall.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwsSearchKeyWordApp extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsSearchKeyWordApp().init(4004,1,"DwsSearchKeyWordApp");
    }
    @Override
    public void run(StreamTableEnvironment tEnv) {
        //1.将动态表与dwd_page进行关联
        tEnv.executeSql("create table page(" +
                " page map<string,string>, " +
                " ts bigint," +
                " et as to_timestamp(from_unixtime(ts/1000)), " +
                " watermark for et as et - interval '3' second " +
                ")with(" +
                " 'connector'='kafka', " +
                " 'properties.bootstrap.servers'='hadoop162:9092,hadoop163:9092,hadoop164:9092', " +
                " 'properties.group,id'='DwsSearchKeyWordApp', " +
                " 'topic'=' "+ Constant.TOPIC_DWD_PAGE + "', " +
                " 'scan.startup.mode'='earliest-offset', " +
                " 'format'='json' " +
                ")"
        );
        //tEnv.sqlQuery("select * from page").execute().print();
        //2.过滤出需要的数据
        Table t1 = tEnv.sqlQuery("select" +
                " page['item'] keyword, " +
                " et " +
                "from page " +
                "where page['page_id'] = 'good_list' " +
                "and page['item'] is not null " +
                "and page['item_type'] = 'keyword'"

        );
        tEnv.createTemporaryView("t1",t1);
        //t1.execute().print();
        //3.对关键词进行分词
        //3.1 注册自定义函数
        tEnv.createTemporaryFunction("ik_analyzer", IkAnalyzer.class);
        //3.2 sql中使用
        Table t2 = tEnv.sqlQuery("select" +
                " word, " +
                " et " +
                "from t1 " +
                "join lateral table(ik_analyzer(keyword)) on true"
        );
        //t2.execute().print();
        tEnv.createTemporaryView("t2", t2);

        //4.开窗聚合
        Table result = tEnv.sqlQuery("select" +
                " date_format(tumble_start(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') stt, " +
                " date_format(tumble_end(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') edt, " +
                " word keyword, " +
                " 'search' source, " +
                " count(word) ct," +
                " unix_timestamp() * 1000 ts " +
                " from t2 " +
                " group by " +
                " word, " +
                " tumble(et, interval '5' second)"
        );
        //5.写出到clickhouse中
        tEnv
                .toRetractStream(result, KeyWordStats.class)
                .filter(t ->t.f0)
                .map(t->t.f1)
                .addSink(FlinkSinkUtil.getClickHouseSink("gmall2021","keyword_stats_2021",KeyWordStats.class));


    }
}

4.测试

将项目打包上传到linux服务器上

使用datagrip连接clickhouse,启动日志数据生成程序,观察clickhouse中是否已经被写入数据即可

Flink实时数仓(三) - 图4

第六章.DWS层商品行为关键词宽表

1.需求分析

从商品主题获得,商品关键词与点击次数,订单次数,添加购物次数的统计表

2.准备工作

  1. 为了获取商品点击次数,下单次数,添加购物车次数,需要重构DwsProductStatsApp,增加最终数据到kafka的代码
/**
    * @Description: 写入到kafka中
    * @Param: [psStreamWithDim]
    * @return: void
    * @Author: jcsune
    * @Date: 2021/11/5 0005
    */
    private void writeToKafka(SingleOutputStreamOperator<ProductStats> psStreamWithDim) {
        psStreamWithDim
                .map(new MapFunction<ProductStats, String>() {
                    @Override
                    public String map(ProductStats value) throws Exception {
                        return JSON.toJSONString(value);
                    }
                })
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWS_PRODUCT_STATS));
    }
  1. 用于封装行为与次数的pojo
package com.atguigu.gmall.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/11/5 14:43
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SourceCount {
    private String source;
    private Long ct;
}
  1. 自定义UDTF函数

实现点击次数,订单次数,添加购物次数的统计

package com.atguigu.gmall.function;

import com.atguigu.gmall.pojo.SourceCount;
import org.apache.flink.table.functions.TableFunction;

public class KwProduct extends TableFunction<SourceCount> {
    public void eval(Long click_ct,Long cart_ct,Long order_ct){
        if(click_ct > 0){
            collect(new SourceCount("click",click_ct));
        }
        if(cart_ct > 0){
            collect(new SourceCount("cart",cart_ct));
        }
        if(order_ct > 0){
            collect(new SourceCount("order",order_ct));
        }

    }

}

3.具体代码

package com.atguigu.gmall.app.dws;

import com.atguigu.gmall.app.BaseSqlApp;
import com.atguigu.gmall.common.Constant;
import com.atguigu.gmall.function.IkAnalyzer;
import com.atguigu.gmall.function.KwProduct;
import com.atguigu.gmall.pojo.KeyWordStats;
import com.atguigu.gmall.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwsProductKeyWordApp extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsProductKeyWordApp().init(4005,1,"DwsProductKeyWordApp");
    }
    @Override
    public void run(StreamTableEnvironment tEnv) {
        //1.建立动态表与dwd_page进行关联
        tEnv.executeSql("create table product_stats(" +
                " stt string, " +
                " edt string, " +
                " sku_name string, " +
                " click_ct bigint, " +
                " cart_ct bigint, " +
                " order_ct bigint " +
                ")with(" +
                " 'connector'='kafka', " +
                " 'properties.bootstrap.servers'='hadoop162:9092,hadoop163:9092,hadoop164:9092', " +
                " 'properties.group.id'='DwsProductKeyWordApp', " +
                " 'topic'=' " + Constant.TOPIC_DWS_PRODUCT_STATS + "', " +
                " 'scan.startup.mode'='latest-offset', " +
                " 'format'='json' " +
                ")"
        );
        //tEnv.sqlQuery("select * from product_stats").execute().print();

        //2.过滤出来3个count至少一个不为0的数据
        Table t1 = tEnv.sqlQuery("select " +
                " * " +
                " from product_stats " +
                " where click_ct >0 " +
                " or cart_ct > 0 " +
                " or order_ct >0 "
        );
        //t1.execute().print();
        tEnv.createTemporaryView("t1",t1);

        //3.把sku_name进行分词
        tEnv.createTemporaryFunction("ik_analyzer", IkAnalyzer.class);
        Table t2 = tEnv.sqlQuery("select" +
                " stt," +
                " edt, " +
                " word, " +
                " click_ct, " +
                " cart_ct, " +
                " order_ct " +
                " from t1 " +
                " , lateral table(ik_analyzer(sku_name))"
        );
        //t2.execute().print();
        tEnv.createTemporaryView("t2",t2);

        //4.按照窗口和关键词聚合
        Table t3 = tEnv.sqlQuery("select" +
                " stt, edt, word," +
                " sum(click_ct) click_ct, " +
                " sum(cart_ct) cart_ct, " +
                " sum(order_ct) order_ct " +
                " from t2 " +
                " group by stt, edt, word"
        );
        //t3.execute().print();
        tEnv.createTemporaryView("t3",t3);
        tEnv.createTemporaryFunction("kw_product", KwProduct.class);

        //5.把每行的3个count,变成3行
        Table result = tEnv.sqlQuery("select " +
                " stt, edt, word keyword," +
                " source, " +
                " ct, " +
                " unix_timestamp()*1000 ts " +
                " from t3 " +
                " join lateral table(kw_product(click_ct, cart_ct, order_ct)) on true"
        );
        //result.execute().print();

        //6.写出到clickhouse中
        tEnv
                .toRetractStream(result, KeyWordStats.class)
                .filter(t -> t.f0)
                .map(t -> t.f1)
                .addSink(FlinkSinkUtil.getClickHouseSink("gmall2021","keyword_stats_2021",KeyWordStats.class));


    }
}

4.测试

将项目打包上传到linux服务器上

使用datagrip连接clickhouse,启动日志数据生成程序和业务数据生成程序,观察clickhouse中是否已经被写入数据即可

Flink实时数仓(三) - 图5