十一部分 Flink CEP
CEP 即Complex Event Processing - 复杂事件处理,Flink CEP 是在 Flink 中实现的复杂时间处理(CEP)库。处理事
件的规则,被叫做“模式”(Pattern),Flink CEP 提供了 Pattern API,用于对输入流数据进行复杂事件规则定义,用
来提取符合规则的事件序列。 Pattern API 大致分为三种:个体模式,组合模式,模式组。
Flink CEP 应用场景:
CEP 在互联网各个行业都有应用,例如金融、物流、电商、智能交通、物联网行业等行业:
实时监控:
在网站的访问日志中寻找那些使用脚本或者工具“爆破”登录的用户;
我们需要在大量的订单交易中发现那些虚假交易(超时未支付)或发现交易活跃用户;
或者在快递运输中发现那些滞留很久没有签收的包裹等。
风险控制:
比如金融行业可以用来进行风险控制和欺诈识别,从交易信息中寻找那些可能存在的危险交易和非法交易。
营销广告:
跟踪用户的实时行为,指定对应的推广策略进行推送,提高广告的转化率。
第 1 节 基础
( 1 )定义 复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在
这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和
聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。 ( 2 )
特征 CEP的特征如下: 目标:从有序的简单事件流中发现一些高阶特征; 输入:一个或多个简单事件构成的事件
流; 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件; 输出:满足规则的复杂
事件。
( 3 )功能 CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的时间流中找出
有意义的模式和复杂的关系,以接近实时或准实时的获得通知或组织一些行为。 CEP支持在流上进行模式匹配,根
据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当条件范围内没有达到满足
的条件时,会导致模式匹配超时。 看起来很简单,但是它有很多不同的功能: ① 输入的流数据,尽快产生结果;
② 在 2 个事件流上,基于时间进行聚合类的计算; ③ 提供实时/准实时的警告和通知; ④ 在多样的数据源中产生关
联分析模式; ⑤ 高吞吐、低延迟的处理 市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都
没有提供专门的库支持。然而,Flink提供了专门的CEP库。 ( 4 )主要组件 Flink为CEP提供了专门的Flink CEP
library,它包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。 首先,开发人员要在
DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成警告。
第 2 节 Pattern API
处理事件的规则,被叫作模式(Pattern)。 Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定
义,用来提取符合规则的事件序列。 模式大致分为三类: ① 个体模式(Individual Patterns) 组成复杂规则的每
一个单独的模式定义,就是个体模式。
② 组合模式(Combining Patterns,也叫模式序列) 很多个体模式组合起来,就形成了整个的模式序列。 模式序
列必须以一个初始模式开始:
③ 模式组(Group of Pattern) 将一个模式序列作为条件嵌套在个体模式里,成为一组模式。
2.1 个体模式
个体模式包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。
( 1 )量词 可以在一个个体模式后追加量词,也就是指定循环次数。
start.times( 3 ).where(_.behavior.startsWith(‘fav’))
val start = Pattern.begin(‘start’)
( 2 )条件 每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。CEP中的个体模式主要通过调
用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类: ① 简单条件 通过.where()方法对
事件中的字段进行判断筛选,决定是否接收该事件
② 组合条件 将简单的条件进行合并;or()方法表示或逻辑相连,where的直接组合就相当于与and。
③ 终止条件 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。
④ 迭代条件 能够对模式之前所有接收的事件进行处理;调用.where((value,ctx) => {…}),可以调用
ctx.getEventForPattern(“name”)
2.2 模式序列
不同的近邻模式如下图:
// 匹配出现 4 次
start.time( 4 )
// 匹配出现 0 次或 4 次
start.time( 4 ).optional
// 匹配出现 2 、 3 或 4 次
start.time( 2 , 4 )
// 匹配出现 2 、 3 或 4 次,并且尽可能多地重复匹配
start.time( 2 , 4 ).greedy
// 匹配出现 1 次或多次
start.oneOrMore
// 匹配出现 0 、 2 或多次,并且尽可能多地重复匹配
start.timesOrMore( 2 ).optional.greedy
start.where(event=>event.getName.startsWith(“foo”))
Pattern.where(event => .../*some condition*/).or(event => /*or condition*/)
( 1 )严格近邻 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定。例如对于模式“a next
b”,事件序列“a,c,b1,b2”没有匹配。 ( 2 )宽松近邻 允许中间出现不匹配的事件,由.followedBy()指定。例如对于
模式“a followedBy b”,事件序列“a,c,b1,b2”匹配为{a,b1}。 ( 3 )非确定性宽松近邻 进一步放宽条件,之前已经匹
配过的事件也可以再次使用,由.followedByAny()指定。例如对于模式“a followedByAny b”,事件序
列“a,c,b1,b2”匹配为{ab1},{a,b2}。 除了以上模式序列外,还可以定义“不希望出现某种近邻关系”: .notNext():
不想让某个事件严格紧邻前一个事件发生。 .notFollowedBy():不想让某个事件在两个事件之间发生。 需要注意:
①所有模式序列必须以.begin()开始;
②模式序列不能以.notFollowedBy()结束;
③“not”类型的模式不能被optional所修饰;
④可以为模式指定时间约束,用来要求在多长时间内匹配有效。
2.3 模式的检测
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern(),给定输入流和模式,就
能得到一个PatternStream。
2.4 匹配事件的提取
next.within(Time.seconds( 10 ))
val input:DataStream[Event] = ...
val pattern:Pattern[Event,_] = ...
val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
创建PatternStream之后,就可以应用select或者flatSelect方法,从检测到的事件序列中提取事件了。 select()方法
需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它。 select()以一个
Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件
的Iterable类型。
flatSelect通过实现PatternFlatSelectFunction实现与select相似的功能。唯一的区别就是flatSelect方法可以返回多
条记录,它通过一个Collector[OUT]类型的参数来将要输出的数据传递到下游。
process
select
2.5超时事件的提取
当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处
理这些超时的部分匹配,select和flatSelect API调用允许指定超时处理程序。
Flink CEP 开发流程:
1. DataSource 中的数据转换为 DataStream;
2. 定义 Pattern,并将 DataStream 和 Pattern 组合转换为 PatternStream;
3. PatternStream 经过 select、process 等算子转换为 DataStraem;
4. 再次转换的 DataStream 经过处理后,sink 到目标库。
select方法:
对检测到的模式序列应用选择函数。对于每个模式序列,调用提供的{@link PatternSelectFunction}。模式选择函
数只能产生一个结果元素。
def selectFn(pattern : Map[String,Iterable[IN]]):OUT={
val startEvent = pattern.get(“start”).get.next
val endEvent = pattern.get(“end”).get.next
OUT(startEvent, endEvent)
}
SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput, new
PatternTimeoutFunction<PayEvent, PayEvent>() {
@Override
public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
return map.get("begin").get( 0 );
}
}, new PatternSelectFunction<PayEvent, PayEvent>() {
@Override
public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
return map.get("pay").get( 0 );
}
});
对超时的部分模式序列应用超时函数。对于每个部分模式序列,调用提供的{@link PatternTimeoutFunction}。模
式超时函数只能产生一个结果元素。
您可以在使用相同的{@link OutputTag}进行select操作的{@link SingleOutputStreamOperator}上获得由{@link
SingleOutputStreamOperator}生成的{@link SingleOutputStreamOperator}生成的超时数据流。
@param timedOutPartialMatchesTag 标识端输出超时模式的@link OutputTag}
@param patternTimeoutFunction 为超时的每个部分模式序列调用的模式超时函数。
@param patternSelectFunction 为每个检测到的模式序列调用的模式选择函数。
@param 产生的超时元素的类型
@param 结果元素的类型
return {@link DataStream},其中包含产生的元素和在边输出中产生的超时元素。
获取{@link DataStream},该{@link DataStream}包含由操作发出到指定{@link OutputTag}的边输出的元素。
第 3 节 NFA:非确定有限自动机
FlinkCEP在运行时会将用户的逻辑转化成这样的一个NFA Graph (nfa对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行状态转换的过程。
上图中的状态机的功能,是检测二进制数是否含有偶数个 0 。从图上可以看出,输入只有 1 和 0 两种。从 S1 状态
开始,只有输入 0 才会转换到 S2 状态,同样 S2 状态下只有输入 0 才会转换到 S1。所以,二进制数输入完毕,如
果满足最终状态,也就是最后停在 S1 状态,那么输入的二进制数就含有偶数个 0 。
第 4 节 案例
Flink CEP 开发流程:
1. DataSource 中的数据转换为 DataStream;watermark、keyby
2. 定义 Pattern,并将 DataStream 和 Pattern 组合转换为 PatternStream;
3. PatternStream 经过 select、process 等算子转换为 DataStream;
DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);
4. 再次转换的 DataStream 经过处理后,sink 到目标库。
案例 1 :恶意登录检测
需求:找出 5 秒内,连续登录失败的账号
思路:
1 、数据源
new CEPLoginBean(1L, “fail”, 1597905234000L), new CEPLoginBean(1L, “success”, 1597905235000L), new
CEPLoginBean(2L, “fail”, 1597905236000L), new CEPLoginBean(2L, “fail”, 1597905237000L), new
CEPLoginBean(2L, “fail”, 1597905238000L), new CEPLoginBean(3L, “fail”, 1597905239000L), new
CEPLoginBean(3L, “success”, 1597905240000L)
2 、在数据源上做出watermark
3 、在watermark上根据id分组keyby
4 、做出模式pattern
5 、在数据流上进行模式匹配
6 、提取匹配成功的数据
代码:
依赖:
Pattern<CEPLoginBean, CEPLoginBean> pattern = Pattern.<CEPLoginBean>begin("start").where(new
IterativeCondition<CEPLoginBean>() {
@Override
public boolean filter(CEPLoginBean value, Context<CEPLoginBean> ctx) throws
Exception {
return value.getLogresult().equals("fail");
}
})
.next("next").where(new IterativeCondition<CEPLoginBean>() {
@Override
public boolean filter(CEPLoginBean value, Context<CEPLoginBean> ctx) throws
Exception {
return value.getLogresult().equals("fail");
}
})
.within(Time.seconds( 5 ));
package com.lagou.bak;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class CEPLoginTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource data = env.fromElements(
new CEPLoginBean(1L, “fail”, 1597905234000L),
new CEPLoginBean(1L, “success”, 1597905235000L),
new CEPLoginBean(2L, “fail”, 1597905236000L),
new CEPLoginBean(2L, “fail”, 1597905237000L),
new CEPLoginBean(2L, “fail”, 1597905238000L),
new CEPLoginBean(3L, “fail”, 1597905239000L),
new CEPLoginBean(3L, “success”, 1597905240000L)
);
SingleOutputStreamOperator watermarks =
data.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator() {
long maxTimeStamp = Long.MIN_VALUE;
public void onEvent(CEPLoginBean event, long eventTimestamp, WatermarkOutput
output) {
maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
}
long maxOutOfOrderness = 500L;
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner(((element, recordTimestamp) -> element.getTs())));
KeyedStream
Pattern
begin(“start”).where(new IterativeCondition() {
@Override
public boolean filter(CEPLoginBean value, Context ctx) throws Exception
{
return value.getLogresult().equals(“fail”);
}
})
.next(“next”).where(new IterativeCondition() {
@Override
public boolean filter(CEPLoginBean value, Context ctx) throws
Exception {
return value.getLogresult().equals(“fail”);
}
})
.within(Time.seconds(5));
PatternStream patternStream = CEP.pattern(keyed, pattern);
SingleOutputStreamOperator process = patternStream.process(new
PatternProcessFunction
@Override
public void processMatch(Map
Collector out) throws Exception {
System.out.println(match);
List start = match.get(“start”);
List next = match.get(“next”);
String res = “start:” + start + “…next:” + next;
out.collect(res + start.get(0).getId());
}
});
process.print();
env.execute();
}
}
案例 2 :检测交易活跃用户
需求:找出 24 小时内,至少 5 次有效交易的用户:
思路:
1 、数据源:
new ActiveUserBean("100XX", 0.0D, 1597905234000L),
new ActiveUserBean("100XX", 100.0D, 1597905235000L),
new ActiveUserBean("100XX", 200.0D, 1597905236000L),
new ActiveUserBean("100XX", 300.0D, 1597905237000L),
new ActiveUserBean("100XX", 400.0D, 1597905238000L),
new ActiveUserBean("100XX", 500.0D, 1597905239000L),
new ActiveUserBean("101XX", 0.0D, 1597905240000L),
new ActiveUserBean("101XX", 100.0D, 1597905241000L)
2 、watermark转化
3 、keyby转化
4 、做出pattern
至少 5 次:timesOrMore(5)
24 小时之内:within(Time.hours(24))
Pattern<ActiveUserBean, ActiveUserBean> pattern = Pattern.
<ActiveUserBean>begin("start").where(new SimpleCondition<ActiveUserBean>() {
@Override
public boolean filter(ActiveUserBean value) throws Exception {
return value.getMoney() > 0;
}
}).timesOrMore(5).within(Time.hours(24));;
5 、模式匹配
6 、提取匹配成功数据
代码:
package com.lagou.bak;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class CEPActiveUser {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource data = env.fromElements(
new ActiveUserBean(“100XX”, 0.0D, 1597905234000L),
new ActiveUserBean(“100XX”, 100.0D, 1597905235000L),
new ActiveUserBean(“100XX”, 200.0D, 1597905236000L),
new ActiveUserBean(“100XX”, 300.0D, 1597905237000L),
new ActiveUserBean(“100XX”, 400.0D, 1597905238000L),
new ActiveUserBean(“100XX”, 500.0D, 1597905239000L),
new ActiveUserBean(“101XX”, 0.0D, 1597905240000L),
new ActiveUserBean(“101XX”, 100.0D, 1597905241000L)
);
SingleOutputStreamOperator watermark =
data.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator() {
long maxTimeStamp = Long.MIN_VALUE;
@Override
public void onEvent(ActiveUserBean event, long eventTimestamp,
WatermarkOutput output) {
maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
}
long maxOutOfOrderness = 500l;
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner(((element, recordTimestamp) -> element.getTs())));
KeyedStream
Pattern<ActiveUserBean, ActiveUserBean> pattern = Pattern.
<ActiveUserBean>begin("start").where(new SimpleCondition<ActiveUserBean>() {
@Override
public boolean filter(ActiveUserBean value) throws Exception {
return value.getMoney() > 0;
}
}).timesOrMore(5).within(Time.hours(24));
PatternStream<ActiveUserBean> patternStream = CEP.pattern(keyed, pattern);
SingleOutputStreamOperator<ActiveUserBean> process = patternStream.process(new
PatternProcessFunction<ActiveUserBean, ActiveUserBean>() {
@Override
public void processMatch(Map<String, List<ActiveUserBean>> match, Context ctx,
Collector<ActiveUserBean> out) throws Exception {
System.out.println(match);
}
});
process.print();
env.execute();
}
}
案例 3 :超时未支付
需求:找出下单后 10 分钟没有支付的订单
思路:
1 、数据源:
new PayEvent(1L, "create", 1597905234000L),
new PayEvent(1L, "pay", 1597905235000L),
new PayEvent(2L, "create", 1597905236000L),
new PayEvent(2L, "pay", 1597905237000L),
new PayEvent(3L, "create", 1597905239000L)
2 、转化watermark
3 、keyby转化
4 、做出Pattern(下单以后 10 分钟内未支付)
注意:下单为create 支付为pay ,create和pay之间不需要是严格临近,所以选择followedBy
Pattern<PayEvent, PayEvent> pattern = Pattern.<PayEvent>
begin("begin")
.where(new IterativeCondition<PayEvent>() {
@Override
public boolean filter(PayEvent payEvent, Context context) throws Exception {
return payEvent.getName().equals("create");
}
})
.followedBy("pay")
.where(new IterativeCondition<PayEvent>() {
@Override
public boolean filter(PayEvent payEvent, Context context) throws Exception {
return payEvent.getName().equals("pay");
}
})
.within(Time.seconds(600));
5 、模式匹配
6 、取出匹配成功的数据
( 1 )采用测输出的方式
OutputTag<PayEvent> orderTimeoutOutput = new OutputTag<PayEvent>("orderTimeout") {};
( 2 )采用select方法
SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput, new
PatternTimeoutFunction<PayEvent, PayEvent>() {
@Override
public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
return map.get("begin").get(0);
}
}, new PatternSelectFunction<PayEvent, PayEvent>() {
@Override
public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
return map.get("pay").get(0);
}
});
//result.print();
DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);
sideOutput.print();
代码:
package com.lagou.bak;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class TimeOutPayCEPMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream source = env.fromElements(
new PayEvent(1L, “create”, 1597905234000L),
new PayEvent(1L, “pay”, 1597905235000L),
new PayEvent(2L, “create”, 1597905236000L),
new PayEvent(2L, “pay”, 1597905237000L),
new PayEvent(3L, “create”, 1597905239000L)
)
// .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(500L)) {
// @Override
// public long extractTimestamp(PayEvent payEvent) {
// return payEvent.getTs();
// }
// })
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator() {
long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(PayEvent event, long eventTimestamp,
WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.getTs());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long maxOutofOrderness = 500l;
output.emitWatermark(new Watermark(maxTimestamp -
maxOutofOrderness));
}
};
}
}.withTimestampAssigner(((element, recordTimestamp) -> element.getTs())))
/.keyBy(new KeySelector
@Override
public Object getKey(PayEvent value) throws Exception {
return value.getId();
}
}
.keyBy(value -> value.getId()
);
// 逻辑处理代码
OutputTag orderTimeoutOutput = new OutputTag(“orderTimeout”) {
};
Pattern
begin(“begin”)
.where(new IterativeCondition() {
@Override
public boolean filter(PayEvent payEvent, Context context) throws Exception {
return payEvent.getName().equals(“create”);
}
})
.followedBy(“pay”)
.where(new IterativeCondition() {
@Override
public boolean filter(PayEvent payEvent, Context context) throws Exception {
return payEvent.getName().equals(“pay”);
}
})
.within(Time.seconds(600));
PatternStream patternStream = CEP.pattern(source, pattern);
SingleOutputStreamOperator result = patternStream.select(orderTimeoutOutput,
new PatternTimeoutFunction
@Override
public PayEvent timeout(Map
return map.get(“begin”).get(0);
}
}, new PatternSelectFunction
@Override
public PayEvent select(Map
return map.get(“pay”).get(0);
}
});
//result.print();
DataStream sideOutput = result.getSideOutput(orderTimeoutOutput);
sideOutput.print();
env.execute(“execute cep”);
}
}
十二部分 FlinkSQL
第 1 节 什么是 Table API 和 Flink SQL
Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。
Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许我们以非常直观的方式,
组合来自一些关系运算符的查询(比如 select、filter 和 join)。而对于 Flink SQL,就是直接可
以在代码中写 SQL,来实现一些查询(Query)操作。Flink 的 SQL 支持,基于实现了 SQL 标
准的 Apache Calcite(Apache 开源 SQL 解析工具)。
无论输入是批输入还是流式输入,在这两套 API 中,指定的查询都具有相同的语义,得到相同的结果
第 2 节 入门代码:
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.11.1</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
依赖说明:
flink-table-api-java-bridge_2.1:桥接器,主要负责 table API 和 DataStream/DataSet API
的连接支持,按照语言分 java 和 scala。
flink-table-planner-blink_2.12:计划器,是 table API 最主要的部分,提供了运行时环境和生
成程序执行计划的 planner;
如果是生产环境,lib 目录下默认已 经有了 planner,就只需要有 bridge 就可以了
flink-table:flinktable的基础依赖
代码:
1 、Flink执行环境env
2 、用env,做出Table环境tEnv
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
基于 blink 版本的流处理环境(Blink-Streaming-Query)或者,基于 blink 版本的批处理环境(Blink-Batch-
Query):
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
// .inBatchMode()
.inStreamingMode()
.build();
3 、获取流式数据源
DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String,
Integer>>() {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (true) {
ctx.collect(new Tuple2<>("name", 10));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
4 、将流式数据源做成Table
( 1 )table方式:
Table table = tEnv.fromDataStream(data, $("name"), $("age"));
( 2 )sql方式:
tEnv.createTemporaryView("userss",data, $("name"), $("age"));
String s = "select name from userss";
Table table = tEnv.sqlQuery(s);
5 、对Table中的数据做查询
( 1 )table方式:
Table name = table.select($("name"));
( 2 )sql方式:
tEnv.createTemporaryView("userss",data, $("name"), $("age"));
String s = "select name from userss";
Table table = tEnv.sqlQuery(s);
6 、将Table转成数据流:
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(name, Row.class);
package com.lagou.table;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import static org.apache.flink.table.api.Expressions.$;
public class TableApiDemo {
public static void main(String[] args) throws Exception {
第 3 节 外部链接
3.1 Connectors
//Flink执行环境env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//用env,做出Table环境tEnv
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//获取流式数据源
DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new
SourceFunction<Tuple2<String, Integer>>() {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (true) {
ctx.collect(new Tuple2<>("name", 10 ));
Thread.sleep( 1000 );
}
}
@Override
public void cancel() {
}
});
//Table方式
//将流式数据源做成Table
Table table = tEnv.fromDataStream(data, $("name"), $("age"));
//对Table中的数据做查询
Table name = table.select($("name"));
//将处理结果输出到控制台
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(name, Row.class);
//SQL方式:
/*tEnv.createTemporaryView("userss",data, $("name"), $("age"));
String s = "select name from userss";
Table table = tEnv.sqlQuery(s);
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table, Row.class);*/
result.print();
env.execute();
}
}
Name Version Maven dependency SQL Client JAR
Filesystem Built-in Built-in
Elasticsearch 6 flink-connector-elasticsearch6 Download
Elasticsearch 7 flink-connector-elasticsearch7 Download
Apache Kafka 0.10 flink-connector-kafka-0.10 Download
Apache Kafka 0.11 flink-connector-kafka-0.11 Download
Apache Kafka 0.11+ (universal) flink-connector-kafka Download
Apache HBase 1.4.3 flink-connector-hbase Download
JDBC flink-connector-jdbc Download
Name Maven dependency SQL Client JAR
Old CSV (for files) Built-in Built-in
CSV (for Kafka) flink-csv Built-in
JSON flink-json Built-in
Apache Avro flink-avro Download
3.2 Formats
1. 数据查询语言DQL 数据查询语言DQL基本结构是由SELECT子句,FROM子句,WHERE 子句组成的查询块:
SELECT <字段名表> FROM <表或视图名> WHERE <查询条件>
2 .数据操纵语言DML 数据操纵语言DML主要有三种形式: 1) 插入:INSERT 2) 更新:UPDATE 3) 删除:DELETE
3. 数据定义语言DDL 数据定义语言DDL用来创建数据库中的各种对象——-表、视图、 索引、同义词、聚簇等如:
CREATE TABLE/VIEW/INDEX/SYN/CLUSTER 表 视图 索引 同义词 簇
DDL操作是隐性提交的!不能rollback
- 数据控制语言DCL 数据控制语言DCL用来授予或回收访问数据库的某种特权,并控制 数据库操纵事务发生的时
间及效果,对数据库实行监视等。如:
连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传
入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,
flink 内部已经提供了,就叫做 FileSystem()。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.11.1</version>
</dependency>
tEnv.connect(new FileSystem().path("sensor.txt"))// 定义表数据来源,外部连接
.withFormat(new Csv()) // 定义从外部系统读取数据之后的格式化方法
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())) // 定义表结构
.createTemporaryTable("inputTable"); // 创建临时表
连接Kafka:
ConnectTableDescriptor descriptor = tEnv.connect(
// declare the external system to connect to
new Kafka()
.version("universal")
.topic("animal")
.startFromEarliest()
.property("bootstrap.servers", "hdp-2:9092")
)
// declare a format for this system
.withFormat(
// new Json()
new Csv()
)
// declare the schema of the table
.withSchema(
new Schema()
// .field("rowtime", DataTypes.TIMESTAMP(3))
// .rowtime(new Rowtime()
// .timestampsFromField("timestamp")
// .watermarksPeriodicBounded(60000)
// )
// .field("user", DataTypes.BIGINT())
.field("message", DataTypes.STRING())
);
// create a table with given name
descriptor.createTemporaryTable("MyUserTable");
Table table1 = tEnv.sqlQuery("select * from MyUserTable");
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tEnv.toRetractStream(table1,
Row.class);
tuple2DataStream.print();
第 4 节 查询数据
4.1 Table API
官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html
select/filter/as
Table filtered = table.select($("name"), $("age")).filter($("age").mod(2).isEqual(0));
//将处理结果输出到控制台
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(filtered, Row.class);
Table mingzi = table.select($("name").as("mingzi"));
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(mingzi, Row.class);
4.2 SQL
tEnv.createTemporaryView("userss",data, $("name"), $("age"));
String s = "select name,age from userss where mod(age,2)=0";
Table table = tEnv.sqlQuery(s);
DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table, Row.class);
第 5 节 输出表
5.1 输出到文件:
代码:
tEnv.connect(new FileSystem().path("D:\\data\\out.txt"))
.withFormat(new Csv())
.withSchema(new Schema().field("name",
DataTypes.STRING()).field("age",DataTypes.INT()))
.createTemporaryTable("outputTable");
filtered.executeInsert("outputTable");
hive支持的输出到orc
package com.lagou.bak;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;
import java.util.Properties;
public class StreamingWriteFileOrc {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
DataStream dataStream = env.addSource(
new MySource());
//写入orc格式的属性
final Properties writerProps = new Properties();
writerProps.setProperty(“orc.compress”, “LZ4”);
//定义类型和字段名
LogicalType[] orcTypes = new LogicalType[]{
new IntType(), new DoubleType(), new VarCharType()};
String[] fields = new String[]{“a1”, “b2”, “c3”};
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
orcTypes,
fields));
//构造工厂类OrcBulkWriterFactory
final OrcBulkWriterFactory factory = new OrcBulkWriterFactory(
new RowDataVectorizer(typeDescription.toString(), orcTypes),
writerProps,
new Configuration());
StreamingFileSink orcSink = StreamingFileSink
.forBulkFormat(new Path(“d:\data\out”), factory)// file:///tmp/aaaa
.build();
dataStream.addSink(orcSink);
env.execute();
}
public static class MySource implements SourceFunction {
@Override
public void run(SourceContext sourceContext) throws Exception{
while (true){
GenericRowData rowData = new GenericRowData(3);
rowData.setField(0, (int) (Math.random() 100));
rowData.setField(1, Math.random() 100);
rowData.setField(2,
org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100)));
sourceContext.collect(rowData);
Thread.sleep(10);
}
}
@Override
public void cancel(){
}
}
}
5.2 输出到Kafka
定义
//往kafka上输出表
DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
int num = 0;
while (true) {
num++;
ctx.collect("name"+num);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
Table name = tEnv.fromDataStream(data, $("name"));
ConnectTableDescriptor descriptor = tEnv.connect(
// declare the external system to connect to
new Kafka()
.version("universal")
.topic("animal")
.startFromEarliest()
.property("bootstrap.servers", "hdp-2:9092")
)
// declare a format for this system
.withFormat(
// new Json()
new Csv()
)
// declare the schema of the table
.withSchema(
new Schema()
// .field("rowtime", DataTypes.TIMESTAMP(3))
// .rowtime(new Rowtime()
// .timestampsFromField("timestamp")
// .watermarksPeriodicBounded(60000)
// )
// .field("user", DataTypes.BIGINT())
.field("message", DataTypes.STRING())
);
// create a table with given name
descriptor.createTemporaryTable("MyUserTable");
name.executeInsert("MyUserTable");
5.3 输出到mysql (了解)
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'jdbc', -- required: specify this table type is jdbc
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
'connector.table' = 'jdbc_table_name', -- required: jdbc table name
-- optional: the class name of the JDBC driver to use to connect to this URL.
-- If not set, it will automatically be derived from the URL.
'connector.driver' = 'com.mysql.jdbc.Driver',
-- optional: jdbc user name and password
'connector.username' = 'name',
'connector.password' = 'password',
-- **followings are scan options, optional, used when reading from a table**
-- optional: SQL query / prepared statement.
-- If set, this will take precedence over the 'connector.table' setting
'connector.read.query' = 'SELECT * FROM sometable',
-- These options must all be specified if any of them is specified. In addition,
-- partition.num must be specified. They describe how to partition the table when
-- reading in parallel from multiple tasks. partition.column must be a numeric,
-- date, or timestamp column from the table in question. Notice that lowerBound and
-- upperBound are just used to decide the partition stride, not for filtering the
-- rows in table. So all rows in the table will be partitioned and returned.
'connector.read.partition.column' = 'column_name', -- optional: the column name used for
partitioning the input.
'connector.read.partition.num' = '50', -- optional: the number of partitions.
'connector.read.partition.lower-bound' = '500', -- optional: the smallest value of the first
partition.
代码:
第十三分 作业提交
Flink的jar文件并不是Flink集群的可执行文件,需要经过转换之后提交给集群
转换过程:
1 、在Flink Client中,通过反射启动jar中的main函数,生成Flink StreamGraph和JobGraph。将JobGraph提交给
Flink集群。
'connector.read.partition.upper-bound' = '1000', -- optional: the largest value of the last
partition.
-- optional, Gives the reader a hint as to the number of rows that should be fetched
-- from the database when reading per round trip. If the value specified is zero, then
-- the hint is ignored. The default value is zero.
'connector.read.fetch-size' = '100',
-- **followings are lookup options, optional, used in temporary join**
-- optional, max number of rows of lookup cache, over this value, the oldest rows will
-- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
-- of them is specified. Cache is not enabled as default.
'connector.lookup.cache.max-rows' = '5000',
-- optional, the max time to live for each rows in lookup cache, over this time, the oldest
rows
-- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
-- them is specified. Cache is not enabled as default.
'connector.lookup.cache.ttl' = '10s',
'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed
-- **followings are sink options, optional, used when writing into table**
-- optional, flush max size (includes all append, upsert and delete records),
-- over this number of records, will flush data. The default value is "5000".
'connector.write.flush.max-rows' = '5000',
-- optional, flush interval mills, over this time, asynchronous threads will flush data.
-- The default value is "0s", which means no asynchronous flush thread will be scheduled.
'connector.write.flush.interval' = '2s',
-- optional, max retry times if writing records to database failed
'connector.write.max-retries' = '3'
)
2 、Flink集群收到JobGraph后,将JobGraph翻译成ExecutionGraph,然后开始调度执行,启动成功之后开始消费
数据
总结:
Flink的核心执行流程就是,把用户的一系列API调用,转化为StreamGraph — JobGraph — ExecutionGraph — 物理
执行拓扑(Task DAG)
Flink提交作业的核心过程图
PipelineExecutor:流水线执行器:
是Flink Client生成JobGraph之后,将作业提交给集群运行的重要环节
Session模式:AbstractSessionClusterExecutor
Per-Job模式:AbstractJobClusterExecutor
IDE调试:LocalExecutor
Session模式:
作业提交通过: yarn-session.sh脚本
在启动脚本的时候检查是否已经存在已经启动好的Flink-Session模式的集群,
然后在PipelineExecutor中,通过Dispatcher提供的Rest接口提交Flink JobGraph
Dispatcher为每一个作业提供一个JobMaser,进入到作业执行阶段
Per-Job模式:一个作业一个集群,作业之间相互隔离。
在PipelineExecutor执行作业提交的时候,可以创建集群并将JobGraph以及所有需要的文件一起提交给Yarn集群,
在Yarn集群的容器中启动Flink Master(JobManager进程),进行初始化后,从文件系统中获取JobGraph,交给
Dispatcher,之后和Session流程相同。