复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
一、CEP相关概念
1.1、配置依赖
在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。
1.2、事件定义
- 简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。
- 复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。
复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。
二、Pattern API
FlinkCEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
- 输入事件流的创建
- Pattern的定义
- Pattern应用在事件流上检测
- 选取结果
2.1、模式定义
定义Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。
每个Pattern都是通过begin方法定义的:
| val start = Pattern.beginEvent |
|---|
下一步通过Pattern.where()方法在Pattern上指定Condition(条件),只有当条件满足之后,当前的Pattern才会接受事件。
| start.where(_.getCallType.equles(“success”)) |
|---|
1.设置循环次数
对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern.。
times:可以通过times指定固定的循环执行次数。 | /*
举例:事件流:a1,b1,a2,b2,a3,a4,a5,a6,a7… ….(a1代表第一次出现a,其他以此类
推,a5第5次出现事件a)
假设start.where(匹配a事件).times(4) ,匹配结果如下:
{a1,a2,a3,a4},{a2,a3,a4,a5},{a3,a4,a5,a6}
假设start.where(匹配a事件).times(2,4) ,匹配结果如下:
{a1,a2},{a1,a2,a3},{a2,a3},{a1,a2,a3,a4},{a2,a3,a4},{a3,a4}… …
/
//当where条件满足指定次数后,循环触发,连续出现4次匹配条件就触发,中间可以有其他事件
start.times(4);
//可以执行触发次数范围,让循环执行次数在该范围之内,中间可以有其他事件
start.times(2, 4); | | —- |optional:也可以通过optional关键字指定要么不触发要么触发指定的次数。需要与多个模式组合时,才有意义。 | start.times(4).optional;//需要多个模式组合时,才有意义
start.times(2, 4).optional; //需要多个模式组合时,才有意义 | | —- |greedy:可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发。需要与多个模式组合时才有意义。 | //触发2、3、4次,尽可能重复执行
start.times(2, 4).greedy; //需要与 oneOrMore 一起使用,才有效果。单独使用没意义
//触发0、2、3、4次,尽可能重复执行
start.times(2, 4).optional.greedy;//需要多个条件组合并且与 oneOrMore 一起使用,才有效果。单独使用没意义 | | —- |oneOrMore:可以通过oneOrMore方法指定触发一次或多次。 | // 触发一次或者多次
start.oneOrMore();
//触发一次或者多次,尽可能重复执行
start.oneOrMore().greedy();
// 触发0次或者多次
start.oneOrMore().optional();
// 触发0次或者多次,尽可能重复执行
start.oneOrMore().optional().greedy(); | | —- |timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。 | // 触发两次或者多次
start.timesOrMore(2);
// 触发两次或者多次,尽可能重复执行
start.timesOrMore(2).greedy();
// 不触发或者触发两次以上,尽可能重复执行
start.timesOrMore(2).optional().greedy(); | | —- |
2、定义条件
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。
简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。 | // 把通话成功的事件挑选出来
start.where(_.getCallType == “success”) | | —- |组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。 | // 把通话成功,或者通话时长大于10秒的事件挑选出来
val start = Pattern.beginStationLog
.where(.callType==”success”)
.or(.duration>10) | | —- |终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。 | pattern.oneOrMore.until(_.callOut.startsWith(“186”)) | | —- |
3、模式序列
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
假设有数据流F:a,c,b1,b2…… b1代表第一次出现b,b2代表第二次出现b
- 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。
举例:从事件流F中匹配ab事件组,则匹配结果为空。
| val strict: Pattern[Event] = start.next(“middle”).where(…) |
|---|
- 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系,忽略不匹配的事件直到下一个匹配出现为止。
举例:从事件流F中匹配ab事件组,则匹配结果为:{a,b1}
| val relaxed: Pattern[Event, _] = start.followedBy(“middle”).where(…) |
|---|
- 非确定宽松邻近:和宽松邻近条件相比,非确定宽松邻近条件指在模式匹配过程中可以忽略已经匹配的条件。
举例:从事件流F中匹配ab事件组,则匹配结果为:{a,b1},{a,b2}
| val nonDetermin: Pattern[Event, _] = start.followedByAny(“middle”).where(…) |
|---|
- 除以上模式序列外,还可以定义“不希望出现某种近邻关系”:
.notNext() —— 不想让某个事件严格紧邻前一个事件发生
.notFollowedBy() —— 不想让某个事件在两个事件之间发生,后面还需要有模式才可使用,即:一个模式不能以notFollowedBy()模式结束。
注意:
1、所有模式序列必须以 .begin() 开始
2、模式序列不能以 .notFollowedBy() 结束
3、“not” 类型的模式不能被 optional 所修饰
4、此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
| //指定模式在10秒内有效 pattern.within(Time.seconds(10)); |
|---|
2.2、模式检测
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
| //cep 做模式检测 val patternStream = CEP.patternEventLog,pattern) |
|---|
2.3、选择结果
得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。
1.通过Select Funciton抽取正常事件
可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。
| def selectFunction(pattern : Map[String, Iterable[IN]]): T= { //获取pattern中的startEvent val startEvent = pattern.get(“start_pattern”).get.next //获取Pattern中middleEvent val middleEvent = pattern.get(“middle”).get.next //返回结果 T(startEvent, middleEvent) } |
|---|
2.通过Flat Select Funciton抽取正常事件
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
| def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[T]) = { //获取pattern中startEvent val startEvent = pattern.get(“start_pattern”).get.next //获取Pattern中middleEvent val middleEvent = pattern.get(“middle”).get.next //并根据startEvent的Value数量进行返回 for (i <- 0 to startEvent.getValue) { collector.collect(T(startEvent, middleEvent)) }} |
|---|
3.通过Select Funciton抽取超时事件
如果模式中有within(time)【注意:这里不是指的是watermark迟到的数据超时,因为这里没有窗口】,那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。
| // 通过CEP.pattern方法创建PatternStream val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) //创建OutputTag,并命名为timeout-output val timeoutTag = OutputTagString //调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] = patternStream.select(timeoutTag){ //超时事件获取 (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()//返回异常事件 } { //正常事件获取 pattern: Map[String, Iterable[Event]] => NormalEvent()//返回正常事件 } //调用getSideOutput方法,并指定timeoutTag将超时事件输出 val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag) |
|---|
三、案例
3.1、测试
/*** Greedy : Flink CEP贪婪模式,需要在多个事件中使用* 如果start 模式中不加上greedy ,结果如下:* start : aa1,aa2, ,middle : aa3,* start : aa2,aa3, ,middle : aa4,* start : aa1,aa2,aa3, ,middle : aa4,* start : aa2,aa3,aa4, ,middle : bbb,* start : aa3,aa4, ,middle : bbb,* start 模式中加上greedy,结果如下:* start : aa2,aa3,aa4, ,middle : bbb,* start : aa3,aa4, ,middle : bbb,*/object GreedyTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val ds1: DataStream[String] = env.fromCollection(Array[String]("aa1","aa2","aa3","aa4","aa5","bbb"))//设置模式,start:判断以“a”开头,出现2-3次,middle : 判断长度是否为3,出现2-3次val pattern: Pattern[String, String] = Pattern.begin[String]("start").where(s => {s.startsWith("a")}).times(2, 3)/*** 注意:加上greedy:就是尽可能多的往后匹配start事件【这里尽可能多不是说组合多】,与middle满足 pattern**/.greedy.next("middle").where(s => {s.length == 3})//模式匹配流val ps: PatternStream[String] = CEP.pattern(ds1,pattern)//选择结果val result: DataStream[String] = ps.select(new PatternSelectFunction[String, String] {override def select(map: util.Map[String, util.List[String]]): String = {import scala.collection.JavaConverters._var returnStr = ""val startList: util.List[String] = map.get("start")if (startList != null) {returnStr += "start : "startList.asScala.toList.foreach(s => {returnStr += s + ","})}val middleList: util.List[String] = map.get("middle")if (middleList != null) {returnStr += "\t,middle : "middleList.asScala.toList.foreach(s => {returnStr += s + ","})}returnStr}})result.print()env.execute()}}
3.2、需求:从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次,则是恶意登录),从而找到哪些用户名是恶意登录。
/*** 登录告警系统* 从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次,则是恶意登录),从而找到哪些用户名是恶意 登录*/case class EventLog(id:Long,userName:String,eventType:String,eventTime:Long)object TestCepDemo {def main(args: Array[String]): Unit = {val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setParallelism(1)import org.apache.flink.streaming.api.scala._streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[EventLog] = streamEnv.fromCollection(List(new EventLog(1, "张三", "fail", 1574840003),new EventLog(1, "张三", "fail", 1574840004),new EventLog(1, "张三", "fail", 1574840005),new EventLog(2, "李四", "fail", 1574840006),new EventLog(2, "李四", "sucess", 1574840007),new EventLog(1, "张三", "fail", 1574840008))).assignAscendingTimestamps(_.eventTime * 1000)stream.print("input data")//定义模式val pattern: Pattern[EventLog, EventLog] = Pattern.begin[EventLog]("begin").where(_.eventType.equals("fail")).next("next1").where(_.eventType.equals("fail")).next("next2").where(_.eventType.equals("fail")).within(Time.seconds(10))//cep 做模式检测val patternStream: PatternStream[EventLog] = CEP.pattern[EventLog](stream.keyBy(_.id),pattern)//第三步: 输出alertval result: DataStream[String] = patternStream.select(new PatternSelectFunction[EventLog, String] {override def select(map: util.Map[String, util.List[EventLog]]) = {val iter: util.Iterator[String] = map.keySet().iterator()val e1: EventLog = map.get(iter.next()).iterator().next()val e2: EventLog = map.get(iter.next()).iterator().next()val e3: EventLog = map.get(iter.next()).iterator().next()"id:" + e1.id + " 用户名:" + e1.userName + "登录的时间:" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime}})result.print(" main ")streamEnv.execute()}}
3.3、案例分析:读取订单数据,如果用户下单后,15分钟内付款完成,则返回付款成功代发货信息,如果用户15分钟后再付款,则返回订单超时信息。
/*** 读取order.log 订单数据,* 需求:如果用户自从下订单到付款如果在15分钟内则返回待发货信息,如果在15分钟后支付订单则返回订单超时*/case class OrderInfo(uid:String,payType:String,orderid:String,time:Long)object CepExample2 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._//设置时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//1.读取文件 ,创建事件流val ds: KeyedStream[OrderInfo, String] = env.readTextFile("./data/order.log").map(line => {val arr = line.split(",")OrderInfo(arr(0), arr(1), arr(2), arr(3).toLong)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderInfo](Time.seconds(2)) {override def extractTimestamp(element: OrderInfo): Long = element.time * 1000L}).keyBy(_.uid)//2.设置模式匹配val pattern: Pattern[OrderInfo, OrderInfo] = Pattern.begin[OrderInfo]("create").where(_.payType.equals("create")).followedBy("pay").where(_.payType.equals("pay")).within(Time.minutes(15))//3.事件流检测val patternStream: PatternStream[OrderInfo] = CEP.pattern(ds,pattern)val outPutTag = new OutputTag[String]("timeout")//4.获取结果val result: DataStream[String] = patternStream.select(outPutTag)(//超时事件,map[模式名称,匹配内容] ,time:超时的时间截止点(map: Map[String, Iterable[OrderInfo]], time: Long) => {val pay = map.keys.lastval orderInfo = map.get(pay).get.lasts"【支付超时】 - 支付类型:${pay},信息:${orderInfo},超时时间点:${time}"})((map: Map[String, Iterable[OrderInfo]]) => {val createInfo = map.get("create").get.lastval payInfo = map.get("pay").get.lasts"【支付成功,待发货】 - 创建订单 : ${createInfo} ,支付:${payInfo}"})//获取侧流 - 超时时间result.getSideOutput(outPutTag).print("超时订单")result.print("正常订单")//触发执行env.execute()}}
