1. package com.atguigu.mycode;
    2. import com.atguigu.bean.HotItem;
    3. import com.atguigu.bean.UserBehavior;
    4. import com.atguigu.util.AtguiguUtil;
    5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    6. import org.apache.flink.api.common.functions.*;
    7. import org.apache.flink.api.common.state.*;
    8. import org.apache.flink.configuration.Configuration;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    11. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    12. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    13. import org.apache.flink.streaming.api.windowing.time.Time;
    14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    15. import org.apache.flink.util.Collector;
    16. import java.time.Duration;
    17. import java.util.Comparator;
    18. import java.util.Date;
    19. import java.util.List;
    20. /**
    21. * @Author: High_Project_TopN
    22. * @Description: TODO
    23. * @Email 365586711@qq.com
    24. * @Date:2021/7/2212:52
    25. * @Version 1.0
    26. */
    27. public class High_Project_TopN {
    28. public static void main(String[] args) {
    29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    30. env.setParallelism(2);
    31. WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
    32. // 记得写泛型
    33. .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    34. // 指定时间戳来源
    35. .withTimestampAssigner( (element, recordTimestamp) -> element.getTimestamp() * 1000L);
    36. env
    37. .readTextFile("input/UserBehavior.csv")
    38. // 要求:5分钟输出最近一小时内的点击量最多的前N个商品
    39. // 最近一小时:windowsize=1hour
    40. // 每5分钟:滑动步长为=5min
    41. // 按照商品分组,每种统计点击量-->pv
    42. // 再按照时间分组,让处于同一个时间段不同组的商品合并在一次,并进行排序
    43. //
    44. .flatMap(new FlatMapFunction<String, UserBehavior>() {
    45. @Override
    46. public void flatMap(String line, Collector<UserBehavior> out) throws Exception {
    47. String[] data = line.split(",");
    48. if ("pv".equals(data[3])) {
    49. out.collect(new UserBehavior(Long.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]),data[3],Long.valueOf(data[4])));
    50. }
    51. }
    52. })
    53. .assignTimestampsAndWatermarks(wms)
    54. .keyBy(UserBehavior::getItemId)
    55. // 让相同键进入同一个可滑动窗口
    56. .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(20)))
    57. .aggregate(new AggregateFunction<UserBehavior, Long, Long>() {
    58. @Override
    59. public Long createAccumulator() {
    60. return 0L;
    61. }
    62. @Override
    63. public Long add(UserBehavior value, Long accumulator) {
    64. return accumulator + 1L;
    65. }
    66. @Override
    67. public Long getResult(Long accumulator) {
    68. return accumulator;
    69. }
    70. @Override
    71. public Long merge(Long a, Long b) {
    72. return null;
    73. }
    74. // 上面只有聚合结果,没有额外的窗口信息,此时需要补充窗口信息
    75. // 考虑下游信息,重新封装成其他东西。
    76. }, new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() {
    77. @Override
    78. public void process(Long key,
    79. Context ctx,
    80. Iterable<Long> elements,// 迭代器只有一个聚合值
    81. Collector<HotItem> out) throws Exception {
    82. out.collect(new HotItem(key,
    83. elements.iterator().next(),
    84. ctx.window().getEnd()// 这个时间是窗口关闭时间,而不是数据时间
    85. ));
    86. }
    87. })
    88. // 将数据进行分组,不管在哪个并行度下
    89. .keyBy(HotItem::getWindowEnd)
    90. // 这里没窗口,所以不需要清理状态
    91. .process(new KeyedProcessFunction<Long, HotItem, String>() {
    92. // 为什么要存状态,因为数据是一条一条的,要求topN必须存储到状态才能排序
    93. private ListState<HotItem> listState;
    94. @Override
    95. public void open(Configuration parameters) throws Exception {
    96. listState = getRuntimeContext().getListState(new ListStateDescriptor<HotItem>(
    97. "topN-state",
    98. HotItem.class
    99. ));
    100. }
    101. @Override
    102. public void processElement(HotItem hotItem, Context ctx, Collector<String> out) throws Exception {
    103. // 每来一条数据,把数据存入一个状态,等所有都来气了,让定时器去计算topN
    104. // 说明:第一个key的值肯定是空的。
    105. // 从状态获取值,如果没有值,那么当前就是第一个元素,注意,第一个值什么
    106. if (!listState.get().iterator().hasNext()) {
    107. // 注册定时器,通过大于窗口的时间来开始新窗口(关闭旧窗口),一个键注册一个定时器
    108. ctx.timerService().registerEventTimeTimer(hotItem.getWindowEnd()+5000L);
    109. }
    110. // 如果不为空,那么向状态开始添加值,一个状态存储一个key中的所有数据,这里key是:endtime
    111. listState.add(hotItem);
    112. }
    113. @Override
    114. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    115. // 排序取list,排序并输出
    116. List<HotItem> hotItems = AtguiguUtil.toList(listState.get());
    117. hotItems.sort(new Comparator<HotItem>() {
    118. @Override
    119. public int compare(HotItem o1, HotItem o2) {
    120. // 降序排
    121. return o2.getClickCount().compareTo(o1.getClickCount());
    122. }
    123. });
    124. StringBuilder sb = new StringBuilder();
    125. sb.append("========================\n");
    126. for (int i = 0; i < Math.min(3,hotItems.size()); i++) {
    127. HotItem hotItem = hotItems.get(i);
    128. // 相比每条都 out.collect(); 一次性写入一个
    129. sb
    130. .append("窗口: ")
    131. .append(new Date(hotItem.getWindowEnd()))
    132. .append(", 商品: ")
    133. .append(hotItem)
    134. .append("\n");
    135. }
    136. out.collect(sb.toString());
    137. }
    138. })
    139. .print();
    140. try {
    141. env.execute();
    142. } catch (Exception e) {
    143. e.printStackTrace();
    144. }
    145. }
    146. }