Flink CEP编程

什么是FlinkCEP

FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库. 它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
1. 目标:从有序的简单事件流中发现一些高阶特征
2. 输入:一个或多个由简单事件构成的事件流
3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
4. 输出:满足规则的复杂事件

Flink CEP应用场景

Ø 风险控制
对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
Ø 策略营销
用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
Ø 运维监控
灵活配置多指标、多依赖来实现更复杂的监控模式。

CEP开发基本步骤

导入CEP相关依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

模式API

模式API可以让你定义想从输入流中抽取的复杂模式序列
几个概念:
Ø 模式:
比如找拥有相同属性事件序列的模式(前面案例中的拥有相同的id), 我们一般把简单模式称之为模式
注意:
1. 每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。(比如前面的start模式)
2. 模式的名字不能包含字符”:”
Ø 模式序列
每个复杂的模式序列包括多个简单的模式,也叫模式序列.你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个
Ø 匹配
输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

规则pattern关键字解析

begin :CEP的第一个,需要起名字

next:CEP第n个模式,需要起名字,严格连续,例如begin是a那么next定义了b,那么两者中间一定不能有其他数据,匹配的数据一定是ab
notNext:CEP第n个模式,需要起名字,非确定松散连续,具体看视频,如果是最后一条数据则不会匹配到东西,因为没有next

followBy:CEP第n个模式,需要起名字,松散连续,如果begin是a那么followBy定义了b,那么两者中间有c,也可匹配出ab,出来的结果是ab,而不是acb
notFollowBy:CEP第n个模式,需要起名字image.png

followedByAny:CEP第n个模式,需要起名字,非确定松散连续,具体看下面

begin和next 等组合在一起就是模式序列

where 、or
times(2)、times(2, 4) 、 oneOrMore() 、 timesOrMore(2)
consecutive连续,allowCombinations允许多组合
optional()可选(0次 或 times次),greedy()贪婪(tims次 或 任意次)
image.png
until(一般用在oneOrMore、timesOrMore后面

within

单个模式

单个模式可以是单例模式或者循环模式.

单例模式

单例模式只接受一个事件. 默认情况模式都是单例模式.

循环模式

循环模式可以接受多个事件.
单例模式配合上量词就是循环模式.(非常类似我们熟悉的正则表达式)
Ø 固定次数

// 1.1 使用量词出现两次Pattern patternWithQuantifier = pattern.times**(2)_**;

原理:
1从第一条数据开始往下匹配两次sensor_1
成功后是sensor_1,1,10 和 sensor_1,1,20
2然后第二条数据也是sensor_1 所以在匹配一个sensor_1 即可满足要求
匹配成功后是sensor_1,1,20 和sensor_1,4,400
3然后第三天数据不是sensor_1不匹配,
4然后第四条虽然是sensor_1,但是往后匹配找不到第二条sensor_1了,所以没法匹配
*以上这种使用times是匹配不连续的,想要用times不用next也能实现连续匹配:
.consecutive() 等价于next

*总结:
image.png
.
image.png
下面的原理大致相同,就是正则匹配嘛
Ø 范围内的次数

// 1.1 使用量词 [2,4] 2次,3次或4次
_Pattern patternWithQuantifier = pattern.times**
(2, 4)_**;

Ø 一次或多次

Pattern patternWithQuantifier = pattern.oneOrMore();

Ø 多次及多次以

// 2次或2次一样
_Pattern patternWithQuantifier = pattern.timesOrMore**
(2)_**;

条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件
Ø 迭代条件(没讲,说一般用不上,可以读到之前匹配的数据)
这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new IterativeCondition() {
__
@Override
public boolean filter(WaterSensor value, Context ctx_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
__ })
;

Ø 简单条件
这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
__
System.out.println(value);
return “sensor_1”.equals(value.getId());
}
__ })
;

Ø 组合条件
把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。
如果想使用OR来组合条件,你可以像下面这样使用or()方法。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new IterativeCondition() {
__
@Override
public boolean filter(WaterSensor value, Context ctx_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return
value.getVc() > 30;
}
})
__
.or_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return
value.getTs() > 3000;
}
__ })
;

Ø 停止条件
如果使用循环模式(oneOrMore, timesOrMore), 可以指定一个停止条件, 否则有可能会内存吃不消.
意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new IterativeCondition() {
__
@Override
public boolean filter(WaterSensor value, Context ctx_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.timesOrMore(2)
.until_(_new
SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return
value.getVc() >= 40;
}
__ })
;

组合模式(模式序列)

把多个单个模式组合在一起就是组合模式. 组合模式由一个初始化模式(.begin(…))开头

next严格连续(严格紧邻)

期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.next(“end”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_2”
.equals(value.getId());
}
__ })
;

注意:
notNext 如果不想后面直接连着一个特定事件

followedBy松散连续

忽略匹配的事件之间的不匹配的事件。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.followedBy(“end”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_2”
.equals(value.getId());
}
__ })
;

注意:
notFollowBy 如果不想一个特定事件发生在两个事件之间的任何地方。(notFollowBy不能位于事件的最后)

followedByAny非确定的松散连续

更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.followedByAny(“end”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_2”
.equals(value.getId());
}
__ })
;

模式知识补充

循环模式的连续性

前面的连续性也可以运用在单个循环模式中. 连续性会被运用在被接受进入模式的事件之间。
Ø 松散连续
默认是松散连续

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.times(2);

Ø 严格连续consecutive

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.times(2)
__
.consecutive();

Ø 非确定的松散连续allowCombinations()
类似于followbyany

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.times(2)
__
.allowCombinations();

循环模式的贪婪性

在组合模式情况下, 对次数的处理尽可能获取最多个的那个次数, 就是贪婪!当一个事件同时满足两个模式的时候起作用.

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
__ })
.times(2, 3).greedy() .next(“end”)
__
.where(_new SimpleCondition() {
@Override
public boolean filter
(WaterSensor value) throws Exception {
return value.getVc() == 30;
}
_ })**;

数据:
sensor_1,1,10
sensor_1,2,20
sensor_1,3,30
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
结果:
{start=[WaterSensor(id=sensor_1, ts=1000, vc=10), WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}

{start=[WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=3000, vc=30)], end=[WaterSensor(id=sensor_2, ts=4000, vc=30)]}
分析:
sensor_1,3,30 在匹配的的时候, 既能匹配第一个模式也可以匹配的第二个模式, 由于第一个模式使用量词则使用greedy的时候会优先匹配第一个模式, 因为要尽可能多的次数
注意:
1. 一般贪婪比非贪婪结果要少!
2. 模式组不能设置为greedy

循环模式的可选性

可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
__ })
.times(2).optional() // 0次或2次 .next**(“end”)
.where(new SimpleCondition() {
@Override
public boolean filter
(WaterSensor value) throws Exception {
return “sensor_2”.equals(value.getId());
}
_ })**;

说明:
start模式可能会没有!

模式组

在前面的代码中次数只能用在某个模式上, 比如: .begin(…).where(…).next(…).where(…).times(2) 这里的次数只会用在next这个模式上, 而不会用在begin模式上.
如果需要用在多个模式上,可以使用模式组!
注意,显示的时候缺失按照模式名字来显示
比如:start:xxx
* next:xxx

Pattern pattern = Pattern
.begin(Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.next(“next”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_2”
.equals(value.getId());
}
}))
.times(2);

结果: sensor_1,sensor_2, sensor_1, sensor_2

超时数据

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。

Pattern pattern = Pattern
.begin(“start”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_1”
.equals(value.getId());
}
})
.next(“end”)
__
.where_(_new SimpleCondition() {
__
@Override
public boolean filter(WaterSensor value_) _throws Exception {
_
_return “sensor_2”
.equals(value.getId());
}
})
.within(Time.seconds(2));

image.png

匹配后跳过策略

对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:
· NO_SKIP: 每个成功的匹配都会被输出(默认)。
· SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
· SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
· SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
· SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
AfterMatchSkipStrategy skipStrategy = …
Pattern.begin(“patternName”, skipStrategy);
image.png

快捷总结

cep可以解决乱序问题

数据匹配从第一行开始。

循环
where
times,每个time只单独匹配当前where

连续性
next(“end”) — 严格连续

  1. followedby -- 命中一次 松散连续<br /> followedbyany --命中多次 松散连续
  2. notnext 严格不连续<br /> 如果这是最后一条数据,则无法取出,系统无法判断是否有next<br /> notfollow by a 松散不连续<br /> 如果这是 最后一条a数据,则无法判断,并报错
  3. nextbegin之间没有 a:
  4. begin<br /> notfollowe a<br /> next<br />循环模式连续性<br /> time<br /> consecutive -- 相当于严格连续,next<br /> 这个连续是看时间的
  5. time<br /> allowCombinations -- 松散严格连续 followbyany

可选项目

  1. where<br /> // times//consecutive<br /> option 代表上面where的内容可有可无,有匹配一次,无匹配一次