概念

参考:https://www.cnblogs.com/shengyang17/p/10858981.html#_label0
8_Flink CEP简介.pdf


11-CEP - 图1
我们可以这样理解,输入的简单事件,flink计算输出的是提取出来的复杂事件。那么就得知,复杂事件是根据一些规则从简单事件流中计算而得来的,这个规则就是我们定义的pattern sequences 。

使用步骤

11-CEP - 图2


接下来使用订单超时监控案例进行步骤讲解

案例:订单超时监控

在pom文件夹中引入依赖(Scala版本)

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-cep-scala_2.11</artifactId>
  4. <version>1.11.2</version>
  5. </dependency>

数据源结构

34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
34732,create,,1558430852
34733,create,,1558430855
34734,create,,1558430859
34732,pay,32h3h4b4t,1558430861

定义输入输出样例类

//定义输入样例类
case class OrderEvent(orderId:Long,eventType:String,txId:String,timestamp:Long)
//定义输出样例类
case class OrderResult(orderId:Long,resultMsg:String)

输出样例类:订单ID、结果信息

读取数据、转换为样例类、并提取时间戳和watermark

val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)
  .map(data =>{
    val arr = data.split(",")
    OrderEvent(arr(0).toLong,arr(1),arr(2),arr(3).toLong)
  })

  .assignAscendingTimestamps(_.timestamp * 1000L)
  .keyBy(_.orderId)

之所以使用assignAscendingTimestamps方法,是因为输入数据的时间戳是升序的,watermark时间=输入数据的timestamp * 1000L

使用cep

//1.定义一个pattern
val orderPayPattern = Pattern
  .begin[OrderEvent]("creat").where(_.eventType == "create")
  .followedBy("pay").where(_.eventType == "pay")
  .within(Time.minutes(15))

//2.将pattern应用到数据流上,进行模式匹配,得到一个pattern stream
val patternStream = CEP.pattern(orderEventStream,orderPayPattern)

//3.定义侧输出流标签,用于处理未匹配成功的事件(超时事件)
val orderTImeOutPutTag = new OutputTag[OrderResult]("orderTimeOut")

//4.调用select方法,从pattern stream上应用select function,检出匹配事件序列,提取并处理匹配的成功支付事件以及超时事件
val resultStream = patternStream.select(orderTImeOutPutTag,new OrderTimeOutSelect(),
  new OrderPaySelect()
)
模式操作 描述
begin(#pattern_sequence) 定一个开始模式:
val start = Pattern.begin( Pattern.begin[Event](“start”).where(…).followedBy(“middle”).where(…) )
where(condition) 为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件:
pattern.where(event => / 一些判断条件 /)
followedBy(#pattern_sequence) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续):
val followedBy = start.followedBy( Pattern.begin[Event](“start”).where(…).followedBy(“middle”).where(…) )
followedByAny(#name) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续):
val followedByAny = start.followedByAny(“middle”)
within(time) 定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:
pattern.within(Time.seconds(10))
  1. CEP.pattern()传递的参数:datastream、定义的pattern

image.png

  1. select()方法传递的参数(推荐使用函数类传参):侧输出流(订单超时)、自定义超时处理函数(超时处理)、模式选择函数(正常处理)

image.png

  1. 自定义处理函数 ```scala / 超时处理 / class OrderTimeOutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{ override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimeStamp: Long): OrderResult = { val timeoutOrderId = pattern.get(“creat”).iterator().next().orderId OrderResult(timeoutOrderId,”timeout” + “: +” +timeoutTimeStamp)

    } }

/ 正常处理 / class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{ override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = { val payedOrderId = pattern.get(“pay”).iterator().next().orderId OrderResult(payedOrderId,”payed success”) } ``` PatternTimeoutFunction()方法传递的输入样例类OrderEvent、输出样例类OrderResult,重写timeout(),超时事件的提取:

  • 当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能处理这些超时的部分匹配,select和flatselect调用允许指定超时处理程序
  • 超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个OutoutTag定义接到的超时事件序列

注意:Map中有create但没有pay(疑惑:为什么只有create数据没有pay数据?),timeoutTimeStamp为超时的时间戳,此处定义的是15分钟之后:.within(Time.minutes(15))
输出样例类包装成(timeoutOrderId,timeoutTimeStamp)

PatternSelectFunction()select()中:Map中既有create又有pay

  • select()需要输入一个select function作为参数,每一个成功匹配的事件序列都会调用它
  • select()以一个Map[String,Iterable[IN]]来接收匹配的事件序列,它接受一个 Map 对,其中key就是每个模式的名称,而value就是接受到的事件的Iterable

image.png