目的:统计近1小时内的热门商品,每5分钟更新一次
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <flink.version>1.10.1</flink.version>
  2. <scala.binary.version>2.12</scala.binary.version>
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-java</artifactId>
  6. <version>${flink.version}</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  11. <version>${flink.version}</version>
  12. </dependency>

代码演示

用文本数据作为实时数据:

Bean

  1. public class ItemViewCount {
  2. private Long itemId; //商品Id
  3. private Long windowEnd;//窗口号
  4. private Long count; //统计
  5. }
  6. public class UserBehavior {
  7. private Long userId;//用户Id
  8. private Long itemId; //商品Id
  9. private Integer categoryId; //类型
  10. private String behavior; //行为
  11. private Long timestamp; //时间戳
  12. }

Main

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建流处理执行环境
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置为事件时间
  3. env.setParallelism(1);//并行度为1
  4. DataStream<String> inputStream = env.readTextFile("文件的绝对路径");
  5. //将文本数据转换为UserBehavior对象,并且分配时间戳和watermark
  6. DataStream<UserBehavior> dataStream = inputStream
  7. .map(line -> {
  8. String[] fields = line.split(",");
  9. return new UserBehavior(new Long(fields[0]), new
  10. Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
  11. })
  12. .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
  13. @Override
  14. public long extractAscendingTimestamp(UserBehavior userBehavior) {
  15. return userBehavior.getTimestamp() * 1000L;
  16. }
  17. });
  18. //过滤需要统计的类型,按照商品id分组,开窗长度为一小时,步长5分钟
  19. DataStream<ItemViewCount> countDataStream = dataStream
  20. .filter(line -> "pv".equals(line.getBehavior()))
  21. .keyBy("itemId")
  22. .timeWindow(Time.hours(1), Time.minutes(5))
  23. .aggregate(new CountAgg(), new WindowResultFunction()); //增量聚合,全量聚合
  24. // 根据窗口号分组,因为要用到Timer,所以用到底层API,在定时器执行时意味该窗口下的数据统计结束,进行输出
  25. DataStream<String> result = countDataStream
  26. .keyBy("windowEnd")
  27. .process(new TopNHotItems(5));
  28. result.print();
  29. env.execute("hotList");
  30. public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
  31. @Override
  32. public Long createAccumulator() {
  33. return 0L;
  34. }
  35. @Override
  36. public Long add(UserBehavior value, Long accumulator) {
  37. return accumulator + 1;
  38. }
  39. @Override
  40. public Long getResult(Long accumulator) {
  41. return accumulator;
  42. }
  43. @Override
  44. public Long merge(Long a, Long b) {
  45. return a + b;
  46. }
  47. }
  48. public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
  49. @Override
  50. public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
  51. Long itemId = tuple.getField(0);
  52. Long windowEnd = timeWindow.getEnd();
  53. Long count = iterable.iterator().next();
  54. collector.collect(new ItemViewCount(itemId, windowEnd, count));
  55. }
  56. }
  57. public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
  58. private Integer size;
  59. ListState<ItemViewCount> listState;
  60. public TopNHotItems(Integer size) {
  61. this.size = size;
  62. }
  63. @Override
  64. public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
  65. listState.add(itemViewCount);
  66. context.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1);
  67. }
  68. @Override
  69. public void open(Configuration parameters) throws Exception {
  70. listState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("", ItemViewCount.class));
  71. }
  72. @Override
  73. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  74. ArrayList<ItemViewCount> itemViewCounts =
  75. Lists.newArrayList(listState.get().iterator());
  76. itemViewCounts.sort(new Comparator<ItemViewCount>() {
  77. @Override
  78. public int compare(ItemViewCount o1, ItemViewCount o2) {
  79. return o2.getCount().intValue() - o1.getCount().intValue();
  80. }
  81. });
  82. StringBuilder result = new StringBuilder();
  83. result.append("====================================\n");
  84. result.append("窗口结束时间: ").append(new Timestamp(timestamp - 1)).append("\n");
  85. for (int i = 0; i < size; i++) {
  86. ItemViewCount currentItemViewCount = itemViewCounts.get(i);
  87. result.append("No").append(i + 1).append(":")
  88. .append(" 商品 ID=")
  89. .append(currentItemViewCount.getItemId())
  90. .append(" 浏览量=")
  91. .append(currentItemViewCount.getCount())
  92. .append("\n");
  93. }
  94. result.append("====================================\n\n");
  95. Thread.sleep(1000);
  96. out.collect(result.toString());
  97. }
  98. }

热门商品统计-文本版.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. ====================================
  5. 窗口结束时间: 2017-11-26 09:05:00.0
  6. No1: 商品 ID=5051027 浏览量=3
  7. No2: 商品 ID=3493253 浏览量=3
  8. No3: 商品 ID=4261030 浏览量=3
  9. No4: 商品 ID=4894670 浏览量=2
  10. No5: 商品 ID=3781391 浏览量=2
  11. ====================================
  12. ====================================
  13. 窗口结束时间: 2017-11-26 09:10:00.0
  14. No1: 商品 ID=812879 浏览量=5
  15. No2: 商品 ID=2600165 浏览量=4
  16. No3: 商品 ID=2828948 浏览量=4
  17. No4: 商品 ID=2338453 浏览量=4
  18. No5: 商品 ID=4261030 浏览量=4
  19. ====================================