概念
参考:https://www.cnblogs.com/shengyang17/p/10858981.html#_label0
8_Flink CEP简介.pdf

我们可以这样理解,输入的简单事件,flink计算输出的是提取出来的复杂事件。那么就得知,复杂事件是根据一些规则从简单事件流中计算而得来的,这个规则就是我们定义的pattern sequences 。
使用步骤

案例:订单超时监控
在pom文件夹中引入依赖(Scala版本)
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>1.11.2</version></dependency>
数据源结构
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
34732,create,,1558430852
34733,create,,1558430855
34734,create,,1558430859
34732,pay,32h3h4b4t,1558430861
定义输入输出样例类
//定义输入样例类
case class OrderEvent(orderId:Long,eventType:String,txId:String,timestamp:Long)
//定义输出样例类
case class OrderResult(orderId:Long,resultMsg:String)
输出样例类:订单ID、结果信息
读取数据、转换为样例类、并提取时间戳和watermark
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)
.map(data =>{
val arr = data.split(",")
OrderEvent(arr(0).toLong,arr(1),arr(2),arr(3).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.keyBy(_.orderId)
之所以使用assignAscendingTimestamps方法,是因为输入数据的时间戳是升序的,watermark时间=输入数据的timestamp * 1000L
使用cep
//1.定义一个pattern
val orderPayPattern = Pattern
.begin[OrderEvent]("creat").where(_.eventType == "create")
.followedBy("pay").where(_.eventType == "pay")
.within(Time.minutes(15))
//2.将pattern应用到数据流上,进行模式匹配,得到一个pattern stream
val patternStream = CEP.pattern(orderEventStream,orderPayPattern)
//3.定义侧输出流标签,用于处理未匹配成功的事件(超时事件)
val orderTImeOutPutTag = new OutputTag[OrderResult]("orderTimeOut")
//4.调用select方法,从pattern stream上应用select function,检出匹配事件序列,提取并处理匹配的成功支付事件以及超时事件
val resultStream = patternStream.select(orderTImeOutPutTag,new OrderTimeOutSelect(),
new OrderPaySelect()
)
| 模式操作 | 描述 |
|---|---|
| begin(#pattern_sequence) | 定一个开始模式: val start = Pattern.begin( Pattern.begin[Event](“start”).where(…).followedBy(“middle”).where(…) ) |
| where(condition) | 为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件: pattern.where(event => … / 一些判断条件 /) |
| followedBy(#pattern_sequence) | 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续): val followedBy = start.followedBy( Pattern.begin[Event](“start”).where(…).followedBy(“middle”).where(…) ) |
| followedByAny(#name) | 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续): val followedByAny = start.followedByAny(“middle”) |
| within(time) | 定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃: pattern.within(Time.seconds(10)) |
- CEP.pattern()传递的参数:datastream、定义的pattern

- select()方法传递的参数(推荐使用函数类传参):侧输出流(订单超时)、自定义超时处理函数(超时处理)、模式选择函数(正常处理)

自定义处理函数 ```scala / 超时处理 / class OrderTimeOutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{ override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimeStamp: Long): OrderResult = { val timeoutOrderId = pattern.get(“creat”).iterator().next().orderId OrderResult(timeoutOrderId,”timeout” + “: +” +timeoutTimeStamp)
} }
/ 正常处理 / class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{ override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = { val payedOrderId = pattern.get(“pay”).iterator().next().orderId OrderResult(payedOrderId,”payed success”) } ``` PatternTimeoutFunction()方法传递的输入样例类OrderEvent、输出样例类OrderResult,重写timeout(),超时事件的提取:
- 当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能处理这些超时的部分匹配,select和flatselect调用允许指定超时处理程序
- 超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个OutoutTag定义接到的超时事件序列
注意:Map中有create但没有pay(疑惑:为什么只有create数据没有pay数据?),timeoutTimeStamp为超时的时间戳,此处定义的是15分钟之后:.within(Time.minutes(15))
输出样例类包装成(timeoutOrderId,timeoutTimeStamp)
PatternSelectFunction()的select()中:Map中既有create又有pay
- select()需要输入一个select function作为参数,每一个成功匹配的事件序列都会调用它
- select()以一个Map[String,Iterable[IN]]来接收匹配的事件序列,它接受一个 Map 对,其中key就是每个模式的名称,而value就是接受到的事件的Iterable

