package com.atguigu.mycode;
import com.atguigu.bean.HotItem;
import com.atguigu.bean.UserBehavior;
import com.atguigu.util.AtguiguUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
/**
* @Author: High_Project_TopN
* @Description: TODO
* @Email 365586711@qq.com
* @Date:2021/7/2212:52
* @Version 1.0
*/
public class High_Project_TopN {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
// 记得写泛型
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 指定时间戳来源
.withTimestampAssigner( (element, recordTimestamp) -> element.getTimestamp() * 1000L);
env
.readTextFile("input/UserBehavior.csv")
// 要求:5分钟输出最近一小时内的点击量最多的前N个商品
// 最近一小时:windowsize=1hour
// 每5分钟:滑动步长为=5min
// 按照商品分组,每种统计点击量-->pv
// 再按照时间分组,让处于同一个时间段不同组的商品合并在一次,并进行排序
//
.flatMap(new FlatMapFunction<String, UserBehavior>() {
@Override
public void flatMap(String line, Collector<UserBehavior> out) throws Exception {
String[] data = line.split(",");
if ("pv".equals(data[3])) {
out.collect(new UserBehavior(Long.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]),data[3],Long.valueOf(data[4])));
}
}
})
.assignTimestampsAndWatermarks(wms)
.keyBy(UserBehavior::getItemId)
// 让相同键进入同一个可滑动窗口
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(20)))
.aggregate(new AggregateFunction<UserBehavior, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
// 上面只有聚合结果,没有额外的窗口信息,此时需要补充窗口信息
// 考虑下游信息,重新封装成其他东西。
}, new ProcessWindowFunction<Long, HotItem, Long, TimeWindow>() {
@Override
public void process(Long key,
Context ctx,
Iterable<Long> elements,// 迭代器只有一个聚合值
Collector<HotItem> out) throws Exception {
out.collect(new HotItem(key,
elements.iterator().next(),
ctx.window().getEnd()// 这个时间是窗口关闭时间,而不是数据时间
));
}
})
// 将数据进行分组,不管在哪个并行度下
.keyBy(HotItem::getWindowEnd)
// 这里没窗口,所以不需要清理状态
.process(new KeyedProcessFunction<Long, HotItem, String>() {
// 为什么要存状态,因为数据是一条一条的,要求topN必须存储到状态才能排序
private ListState<HotItem> listState;
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<HotItem>(
"topN-state",
HotItem.class
));
}
@Override
public void processElement(HotItem hotItem, Context ctx, Collector<String> out) throws Exception {
// 每来一条数据,把数据存入一个状态,等所有都来气了,让定时器去计算topN
// 说明:第一个key的值肯定是空的。
// 从状态获取值,如果没有值,那么当前就是第一个元素,注意,第一个值什么
if (!listState.get().iterator().hasNext()) {
// 注册定时器,通过大于窗口的时间来开始新窗口(关闭旧窗口),一个键注册一个定时器
ctx.timerService().registerEventTimeTimer(hotItem.getWindowEnd()+5000L);
}
// 如果不为空,那么向状态开始添加值,一个状态存储一个key中的所有数据,这里key是:endtime
listState.add(hotItem);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 排序取list,排序并输出
List<HotItem> hotItems = AtguiguUtil.toList(listState.get());
hotItems.sort(new Comparator<HotItem>() {
@Override
public int compare(HotItem o1, HotItem o2) {
// 降序排
return o2.getClickCount().compareTo(o1.getClickCount());
}
});
StringBuilder sb = new StringBuilder();
sb.append("========================\n");
for (int i = 0; i < Math.min(3,hotItems.size()); i++) {
HotItem hotItem = hotItems.get(i);
// 相比每条都 out.collect(); 一次性写入一个
sb
.append("窗口: ")
.append(new Date(hotItem.getWindowEnd()))
.append(", 商品: ")
.append(hotItem)
.append("\n");
}
out.collect(sb.toString());
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}