目的:统计近1小时内的热门商品,每5分钟更新一次
环境:jdk8 工具:idea
创建maven项目
引入jar包
<flink.version>1.10.1</flink.version><scala.binary.version>2.12</scala.binary.version><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
代码演示
Bean
public class ItemViewCount {private Long itemId; //商品Idprivate Long windowEnd;//窗口号private Long count; //统计}public class UserBehavior {private Long userId;//用户Idprivate Long itemId; //商品Idprivate Integer categoryId; //类型private String behavior; //行为private Long timestamp; //时间戳}
Main
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建流处理执行环境env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置为事件时间env.setParallelism(1);//并行度为1DataStream<String> inputStream = env.readTextFile("文件的绝对路径");//将文本数据转换为UserBehavior对象,并且分配时间戳和watermarkDataStream<UserBehavior> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new UserBehavior(new Long(fields[0]), newLong(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {@Overridepublic long extractAscendingTimestamp(UserBehavior userBehavior) {return userBehavior.getTimestamp() * 1000L;}});//过滤需要统计的类型,按照商品id分组,开窗长度为一小时,步长5分钟DataStream<ItemViewCount> countDataStream = dataStream.filter(line -> "pv".equals(line.getBehavior())).keyBy("itemId").timeWindow(Time.hours(1), Time.minutes(5)).aggregate(new CountAgg(), new WindowResultFunction()); //增量聚合,全量聚合// 根据窗口号分组,因为要用到Timer,所以用到底层API,在定时器执行时意味该窗口下的数据统计结束,进行输出DataStream<String> result = countDataStream.keyBy("windowEnd").process(new TopNHotItems(5));result.print();env.execute("hotList");public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserBehavior value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}}public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {@Overridepublic void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {Long itemId = tuple.getField(0);Long windowEnd = timeWindow.getEnd();Long count = iterable.iterator().next();collector.collect(new ItemViewCount(itemId, windowEnd, count));}}public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {private Integer size;ListState<ItemViewCount> listState;public TopNHotItems(Integer size) {this.size = size;}@Overridepublic void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {listState.add(itemViewCount);context.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1);}@Overridepublic void open(Configuration parameters) throws Exception {listState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("", ItemViewCount.class));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {ArrayList<ItemViewCount> itemViewCounts =Lists.newArrayList(listState.get().iterator());itemViewCounts.sort(new Comparator<ItemViewCount>() {@Overridepublic int compare(ItemViewCount o1, ItemViewCount o2) {return o2.getCount().intValue() - o1.getCount().intValue();}});StringBuilder result = new StringBuilder();result.append("====================================\n");result.append("窗口结束时间: ").append(new Timestamp(timestamp - 1)).append("\n");for (int i = 0; i < size; i++) {ItemViewCount currentItemViewCount = itemViewCounts.get(i);result.append("No").append(i + 1).append(":").append(" 商品 ID=").append(currentItemViewCount.getItemId()).append(" 浏览量=").append(currentItemViewCount.getCount()).append("\n");}result.append("====================================\n\n");Thread.sleep(1000);out.collect(result.toString());}}
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.====================================窗口结束时间: 2017-11-26 09:05:00.0No1: 商品 ID=5051027 浏览量=3No2: 商品 ID=3493253 浏览量=3No3: 商品 ID=4261030 浏览量=3No4: 商品 ID=4894670 浏览量=2No5: 商品 ID=3781391 浏览量=2========================================================================窗口结束时间: 2017-11-26 09:10:00.0No1: 商品 ID=812879 浏览量=5No2: 商品 ID=2600165 浏览量=4No3: 商品 ID=2828948 浏览量=4No4: 商品 ID=2338453 浏览量=4No5: 商品 ID=4261030 浏览量=4====================================
