package com.atguigu.mycode;import com.atguigu.bean.HotItem;import com.atguigu.bean.UserBehavior;import com.atguigu.util.AtguiguUtil;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.*;import org.apache.flink.api.common.state.*;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Comparator;import java.util.Date;import java.util.List;/** * @Author: High_Project_TopN * @Description: TODO * @Email 365586711@qq.com * @Date:2021/7/2212:52 * @Version 1.0 */public class High_Project_TopN { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); WatermarkStrategy<UserBehavior> wms = WatermarkStrategy // 记得写泛型 .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 指定时间戳来源 .withTimestampAssigner( (element, recordTimestamp) -> element.getTimestamp() * 1000L); env .readTextFile("input/UserBehavior.csv") // 要求:5分钟输出最近一小时内的点击量最多的前N个商品 // 最近一小时:windowsize=1hour // 每5分钟:滑动步长为=5min // 按照商品分组,每种统计点击量-->pv // 再按照时间分组,让处于同一个时间段不同组的商品合并在一次,并进行排序 // .flatMap(new FlatMapFunction<String, UserBehavior>() { @Override public void flatMap(String line, Collector<UserBehavior> out) throws Exception { String[] data = line.split(","); if ("pv".equals(data[3])) { out.collect(new UserBehavior(Long.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]),data[3],Long.valueOf(data[4]))); } } }) .assignTimestampsAndWatermarks(wms) .keyBy(UserBehavior::getItemId) // 让相同键进入同一个可滑动窗口 .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(20))) .aggregate(new AggregateFunction<UserBehavior, Long, Long>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior value, Long accumulator) { return accumulator + 1L; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } // 上面只有聚合结果,没有额外的窗口信息,此时需要补充窗口信息 // 考虑下游信息,重新封装成其他东西。 }, new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() { @Override public void process(Long key, Context ctx, Iterable<Long> elements,// 迭代器只有一个聚合值 Collector<HotItem> out) throws Exception { out.collect(new HotItem(key, elements.iterator().next(), ctx.window().getEnd()// 这个时间是窗口关闭时间,而不是数据时间 )); } }) // 将数据进行分组,不管在哪个并行度下 .keyBy(HotItem::getWindowEnd) // 这里没窗口,所以不需要清理状态 .process(new KeyedProcessFunction<Long, HotItem, String>() { // 为什么要存状态,因为数据是一条一条的,要求topN必须存储到状态才能排序 private ListState<HotItem> listState; @Override public void open(Configuration parameters) throws Exception { listState = getRuntimeContext().getListState(new ListStateDescriptor<HotItem>( "topN-state", HotItem.class )); } @Override public void processElement(HotItem hotItem, Context ctx, Collector<String> out) throws Exception { // 每来一条数据,把数据存入一个状态,等所有都来气了,让定时器去计算topN // 说明:第一个key的值肯定是空的。 // 从状态获取值,如果没有值,那么当前就是第一个元素,注意,第一个值什么 if (!listState.get().iterator().hasNext()) { // 注册定时器,通过大于窗口的时间来开始新窗口(关闭旧窗口),一个键注册一个定时器 ctx.timerService().registerEventTimeTimer(hotItem.getWindowEnd()+5000L); } // 如果不为空,那么向状态开始添加值,一个状态存储一个key中的所有数据,这里key是:endtime listState.add(hotItem); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 排序取list,排序并输出 List<HotItem> hotItems = AtguiguUtil.toList(listState.get()); hotItems.sort(new Comparator<HotItem>() { @Override public int compare(HotItem o1, HotItem o2) { // 降序排 return o2.getClickCount().compareTo(o1.getClickCount()); } }); StringBuilder sb = new StringBuilder(); sb.append("========================\n"); for (int i = 0; i < Math.min(3,hotItems.size()); i++) { HotItem hotItem = hotItems.get(i);// 相比每条都 out.collect(); 一次性写入一个 sb .append("窗口: ") .append(new Date(hotItem.getWindowEnd())) .append(", 商品: ") .append(hotItem) .append("\n"); } out.collect(sb.toString()); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }}