目的:统计近1小时内的热门商品,每5分钟更新一次
环境:jdk8 工具:idea
创建maven项目
引入jar包
<flink.version>1.10.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码演示
Bean
public class ItemViewCount {
private Long itemId; //商品Id
private Long windowEnd;//窗口号
private Long count; //统计
}
public class UserBehavior {
private Long userId;//用户Id
private Long itemId; //商品Id
private Integer categoryId; //类型
private String behavior; //行为
private Long timestamp; //时间戳
}
Main
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建流处理执行环境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置为事件时间
env.setParallelism(1);//并行度为1
DataStream<String> inputStream = env.readTextFile("文件的绝对路径");
//将文本数据转换为UserBehavior对象,并且分配时间戳和watermark
DataStream<UserBehavior> dataStream = inputStream
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(new Long(fields[0]), new
Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
return userBehavior.getTimestamp() * 1000L;
}
});
//过滤需要统计的类型,按照商品id分组,开窗长度为一小时,步长5分钟
DataStream<ItemViewCount> countDataStream = dataStream
.filter(line -> "pv".equals(line.getBehavior()))
.keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction()); //增量聚合,全量聚合
// 根据窗口号分组,因为要用到Timer,所以用到底层API,在定时器执行时意味该窗口下的数据统计结束,进行输出
DataStream<String> result = countDataStream
.keyBy("windowEnd")
.process(new TopNHotItems(5));
result.print();
env.execute("hotList");
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
Long itemId = tuple.getField(0);
Long windowEnd = timeWindow.getEnd();
Long count = iterable.iterator().next();
collector.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
private Integer size;
ListState<ItemViewCount> listState;
public TopNHotItems(Integer size) {
this.size = size;
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
listState.add(itemViewCount);
context.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1);
}
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("", ItemViewCount.class));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<ItemViewCount> itemViewCounts =
Lists.newArrayList(listState.get().iterator());
itemViewCounts.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return o2.getCount().intValue() - o1.getCount().intValue();
}
});
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("窗口结束时间: ").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < size; i++) {
ItemViewCount currentItemViewCount = itemViewCounts.get(i);
result.append("No").append(i + 1).append(":")
.append(" 商品 ID=")
.append(currentItemViewCount.getItemId())
.append(" 浏览量=")
.append(currentItemViewCount.getCount())
.append("\n");
}
result.append("====================================\n\n");
Thread.sleep(1000);
out.collect(result.toString());
}
}
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
====================================
窗口结束时间: 2017-11-26 09:05:00.0
No1: 商品 ID=5051027 浏览量=3
No2: 商品 ID=3493253 浏览量=3
No3: 商品 ID=4261030 浏览量=3
No4: 商品 ID=4894670 浏览量=2
No5: 商品 ID=3781391 浏览量=2
====================================
====================================
窗口结束时间: 2017-11-26 09:10:00.0
No1: 商品 ID=812879 浏览量=5
No2: 商品 ID=2600165 浏览量=4
No3: 商品 ID=2828948 浏览量=4
No4: 商品 ID=2338453 浏览量=4
No5: 商品 ID=4261030 浏览量=4
====================================