目前仅 Blink 计划器支持 Top-N 。
Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。
flink 专门为topN作出了优化,
本来desc是无法实现的,只有通过过滤后前N个才能实现desc
topN只能这么写,只有一种写法
主键:每个窗口都只有一个第一名,所以窗口结束时间和排名可以作为主键
需求描述
每隔10min 统计最近1hour的热门商品top3, 并把统计的结果写入到mysql中
思路:
1. 统计每个商品的点击量, 开窗
2. 分组窗口分组,
3. over窗口
数据源
在mysql中创建表
CREATE DATABASE flink_sql; USE flink_sql; DROP TABLE IF EXISTS hot_item ;CREATE TABLE hot_item (w_end timestamp NOT NULL,item_id bigint(20) NOT NULL,item_count bigint(20) NOT NULL,rk bigint(20) NOT NULL,— 因为是流数据,item_id在同一个窗口会有很多重复的,必须从窗口角度触发,而排名是不重复的 PRIMARY KEY ( w_end ,rk )) ENGINE=InnoDB DEFAULT CHARSET=utf8; |
---|
导入JDBC Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
具体实现代码
| public class Flink01HotItem_TopN **{
public static void main(String[] args) {
**StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment();
env.setParallelism(2);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
_// 使用sql从文件读取数据<br /> _tenv.executeSql**_(<br />__ _"create table user_behavior(" **+<br /> **" user_id bigint, " **+<br /> **" item_id bigint, " **+<br /> **" category_id int, " **+<br /> **" behavior string, " **+<br /> **" ts bigint, " **+<br /> **" event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " **+<br /> **" watermark for event_time as event_time - interval '5' second " **+<br /> **")with(" **+<br /> **" 'connector'='filesystem', " **+<br /> **" 'path'='input/UserBehavior.csv', " **+<br /> **" 'format'='csv')"<br /> _)_**;
_// 每隔 10m 统计一次最近 1h 的热门商品 top
// 1. 计算每每个窗口内每个商品的点击量<br /> _Table t1 = tenv<br /> .sqlQuery**_(<br />__ _"select " **+<br /> **" item_id, " **+<br /> **" hop_end(event_time, interval '10' minute, interval '1' hour) w_end," **+<br /> **" count(*) item_count " **+<br /> **"from user_behavior " **+<br /> **"where behavior='pv' " **+<br /> **"group by hop(event_time, interval '10' minute, interval '1' hour), item_id"<br /> _)_**;<br /> tenv.createTemporaryView**_(_"t1"**, t1**_)_**;<br /> _// 2. 按照窗口开窗, 对商品点击量进行排名<br /> _Table t2 = tenv.sqlQuery**_(<br />__ _"select " **+<br /> **" *," **+<br /> **" row_number() over(partition by w_end order by item_count desc) rk " **+<br /> **"from t1"<br /> _)_**;<br /> tenv.createTemporaryView**_(_"t2"**, t2**_)_**;
_// 3. 取 top3<br /> _Table t3 = tenv.sqlQuery**_(<br />__ _"select " **+<br /> **" item_id, w_end, item_count, rk " **+<br /> **"from t2 " **+<br /> **"where rk<=3"<br /> _)_**;
_// 4. 数据写入到mysql<br /> // 4.1 创建输出表<br /> _tenv.executeSql**_(_"create table hot_item(" **+<br /> **" item_id bigint, " **+<br /> **" w_end timestamp(3), " **+<br /> **" item_count bigint, " **+<br /> **" rk bigint, " **+<br /> **" PRIMARY KEY (w_end, rk) NOT ENFORCED)" **+<br /> **"with(" **+<br /> **" 'connector' = 'jdbc', " **+<br /> **" 'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " **+<br /> **" 'table-name' = 'hot_item', " **+<br /> **" 'username' = 'root', " **+<br /> **" 'password' = 'aaaaaa' " **+<br /> **")"_)_**;<br /> _// 4.2 写入到输出表<br /> _t3.executeInsert**_(_"hot_item"_)_**;<br /> **_}<br />__}_** |
| —- |
_