57cb3642272cf0d48ff52533dcc376c5.png
复杂事件处理 (Complex Event Progressing,CEP) 是一种基于事件流的处理技术,它将系统数据看作不同类型的事件,通过分析事件间的关系,建立不同的事件关系序列库,利用过滤、关联、聚合、模式匹配等技术,最终由简单事件产生高级事件或商业流程。早在 20 世纪 80 年代,SQL 的出现通过面向问题的方式取代了面向过程的查询数据方式,率先在数据库中广泛应用起来。90 年代,Sybase 率先提出触发器(Trigger)的理念,把数据的变更与事件联系起来,但触发器能够处理的数据量和复杂度有限,而且也没有时间序列的概念。2000 年左右有厂商开始在这个方向上做一些基于事件和数据流的处理,并且借鉴 SQL,希望通过面向问题的方式进行处理,现在已经形成了一个特殊的领域,也就是复杂事件处理(CEP)。

目前 CEP 系统有很多,功能也各不相同,常见的 CEP 系统有 Esper、Shiddi、Flink、Oracle Event Processing 等等,更详细的信息可参见 Complex event processing - Wikipedia。本文将从什么是 CEP、CEP 与流式计算、CEP 分布式实现等几个方面简单介绍 CEP。

一、什么是复杂事件处理

为了直观理解 CEP,我们先来看 Shiddi 所提供的一个官方例子。

  1. public class SimpleFilterSample {
  2. public static void main(String[] args) throws InterruptedException {
  3. // Creating Siddhi Manager
  4. SiddhiManager siddhiManager = new SiddhiManager();
  5. String siddhiApp = "" +
  6. "define stream cseEventStream (symbol string, price float, volume long); " +
  7. "" +
  8. "@info(name = 'query1') " +
  9. "from cseEventStream[volume < 150] " +
  10. "select symbol,price " +
  11. "insert into outputStream ;";
  12. // Generating runtime
  13. SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
  14. // Adding callback to retrieve output events from query
  15. siddhiAppRuntime.addCallback("query1", new QueryCallback() {
  16. @Override
  17. public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
  18. // EventPrinter.print(timeStamp, inEvents, removeEvents);
  19. System.out.print(inEvents[0].getData(0) + " ");
  20. }
  21. });
  22. // Retrieving InputHandler to push events into Siddhi
  23. InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
  24. // Starting event processing
  25. siddhiAppRuntime.start();
  26. // Sending events to Siddhi
  27. inputHandler.send(new Object[]{"Welcome", 700f, 100L});
  28. inputHandler.send(new Object[]{"WSO2", 60.5f, 200L});
  29. inputHandler.send(new Object[]{"to", 50f, 30L});
  30. inputHandler.send(new Object[]{"IBM", 76.6f, 400L});
  31. inputHandler.send(new Object[]{"siddhi!", 45.6f, 50L});
  32. Thread.sleep(500);
  33. // Shutting down the runtime
  34. siddhiAppRuntime.shutdown();
  35. // Shutting down Siddhi
  36. siddhiManager.shutdown();
  37. }
  38. }

基于上面的例子,很容易对 CEP 有个直观的理解,首先定义一个事件流 cseEventStream,通过程序不断向这个流中插入事件,同时通过对流中事件的实时处理,过滤调 volume < 150 的事件,并且插入到 outputStream 中,通过关联的回调函数对 outputStream 流中的事件在做进一步处理,整个处理过程完全使用类 SQL 面向问题方式,不需要针对性的开发过程式处理程序。

尽管 CEP 有多种不同实现,但总体上讲 CEP 的特点是通过类 SQL、DSL 等方式完成对业务逻辑处理的描述,而非开发过程式处理程序。下面再来看个例子:如果一个房间温度在 10 分钟之内增长超过 5 度,发送告警。

  1. from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min
  2. select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;

对于不熟悉 CEP 的人来讲,这段代码初看起来有点复杂,我们可以简单的这么理解一下,首先有一个叫做 TempStream 的流存放温度事件,对于每一个进入 TempStream 流的事件都定义一个 10 分钟的事件窗口(within 10 min),我们把第一个事件定义为 e1,窗口期内 e1 之后的其他事件我们定义为 e2,[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]表示 e1,e2 是同一房间产生的事件并且温度增长超过 5 度,当满足这个条件时,将 e1,e2 作为一条记录插入到 AlertStream 流中。

有别与一般的流式处理框架,every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] 这种独特的表现形式在 CEP 中被称为模式(Pattern)。

二、CEP vs 流式计算

伴随着 Storm、Spark、Flink 等流式计算框架的出现,CEP 的热度也逐渐上升起来,尽管 CEP 的出现要早于 Storm、Spark、Flink 很久。CEP 与流式计算有很多相似之处,但两者最大的区别就是 CEP 是面向问题的,而 Storm 等流式计算框架是面向过程的,针对具体的问题需要开发大量应用程序。可以认为 Storm 等流式计算框架可以做为 CEP 的底层实现,事实上像 WSO2 这样的产品也就是这样实现的,也就是说当用户定义 CEP Query 后,WSO2 进行 Query 解析,分解成若干算子后交由 Storm 集群去处理。

三、分布式 CEP

前文中,我们展示了一个单实例的 CEP 程序并且提及流计算框架可以做为 CEP 的基础,通常像 Storm 这类流计算框架是分布式、无状态的,而 CEP 通常是有状态的,这里我们将进一步讨论 CEP 遇到分布式、无状态的流计算会遇到哪些问题以及如何处理:

  1. 无状态处理,Filter 这类算子是典型的无状态处理,只对输入流中的数据进行过滤输出到其他流中。
  2. 有状态处理,Window、Sequence、Pattern 这类算子,需要对事件集合进行处理的就涉及到状态存储问题。
  3. 分区,为了提高处理性能,通常分布式环境下采用多任务进行处理,流中的事件交由哪个任务处理也是一个问题。

前文中我们提到 WSO2 基于 Storm,接下来以“多种类型的厨房设备将自身设备状态发送给 CEP,检测到异常后发送告警通知。”的例子来介绍 WSO2 如何与 Storm 结合。
distributed-stream-processing-with-wso2-stream-processor
image.png

Step1,首先定义一个通过 Http 形式接受事件的流(DevicePowerStream),DevicePowerStream 的并行度是 3,可以理解为 DevicePowerStream 对应一个 Strom Bolt,Bolt 的并行度是 3。

  1. @source(type = http’, receiver.url=‘ ‘, topic = device-power’, @dist(parallel =‘3’) @map(type = json’))
  2. define stream DevicePowerStream (type string, deviceID string, power int);

Step2, 当 DevicePowerStream 收到数据后,业务上只关注 type=‘monitored’的设备数据,所以 DevicePowerStream 流用于过滤 type=‘monitored’的数据。

  1. @info(name = monitored-filter’) @dist(execGroup=‘group1’, parallel =‘3’)
  2. from DevicePowerStream[type == monitored’]
  3. select deviceID, power
  4. insert current events into MonitoredDevicesPowerStream;

Step3, 根据 deviceID 对 MonitoredDevicesPowerStream 中的数据做分组,同时定义一个 2min 的时间窗口,并计算窗口期内 power 的平均值,写入到 AvgPowerStream 中,对于 AvgPowerStream 流的处理则采用了前面提到的 CEP Pattern 技术,10 分钟内出现两次事件值的增幅大于 5 则认为是异常事件。

  1. @info(name = power-increase-pattern’) @dist(execGroup=‘group2’, parallel =‘3’)
  2. partition with (deviceID of MonitoredDevicesPowerStream)
  3. begin
  4. @info(name = avg-calculator’)
  5. from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
  6. insert current events into #AvgPowerStream;
  7. @info(name = power-increase-detector’)
  8. from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
  9. select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
  10. insert current events into RisingPowerStream;
  11. end;


Step4,而后对异常事件在进行后续过滤处理、发送告警,这里不在详细解释。

从上面的图中可以看到,流之间的数据传递使用了 Kafka 而非 Storm Bolt 之间数据的简单传递。这里应用 Kafka 做为持久化存储主要解决以下问题:

  1. 流之间消费速度不同所产生的背压问题。
  2. 通过外部存储,实现流处理任务无本地状态,可以扩展并行度。
  3. 存储流窗口期数据。

完整示例:

  1. @source(type = 'http', receiver.url=' ', topic = 'device-power', @dist(parallel ='3') @map(type = 'json'))
  2. define stream DevicePowerStream (type string, deviceID string, power int);
  3. @sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}',
  4. @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
  5. define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);
  6. @Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/sp", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver",field.length="symbol:100")
  7. define table DeviceIdInfoTable (deviceID string, roomID string, autorityContactEmail string);
  8. @info(name = 'monitored-filter') @dist(execGroup='group1', parallel ='3')
  9. from DevicePowerStream[type == 'monitored']
  10. select deviceID, power
  11. insert current events into MonitoredDevicesPowerStream;
  12. @info(name = 'power-increase-pattern') @dist(execGroup='group2', parallel ='3')
  13. partition with (deviceID of MonitoredDevicesPowerStream)
  14. begin
  15. @info(name = 'avg-calculator')
  16. from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
  17. insert current events into #AvgPowerStream;
  18. @info(name = 'power-increase-detector')
  19. from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
  20. select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
  21. insert current events into RisingPowerStream;
  22. end;
  23. @info(name = 'power-range-filter') @dist(execGroup='group3', parallel ='1')
  24. from RisingPowerStream[finalPower > 100]
  25. select deviceID, initialPower, finalPower
  26. insert current events into DevicesWithinRangeStream;
  27. @info(name = 'enrich-alert')
  28. @dist(execGroup='group3' ,parallel ='1')
  29. from DevicesWithinRangeStream as s join DeviceIdInfoTable as t
  30. on s.deviceID == t.deviceID
  31. select s.deviceID as deviceID, t.roomID as roomID, s.initialPower as initialPower, s.finalPower as finalPower, t.autorityContactEmail as autorityContactEmail insert current events into AlertStream;


四、总结

值得一提的是,随着流计算的兴起,出现了很多基于流计算开发的类 SQL 系统,简化流计算开发,比如 Kafka KSQL。相比于 CEP,这类系统普遍实现了 Window 算子,但却缺少 Sequence、Pattern 等高级算子,或者不久之后也会实现。CEP 在 IOT、量化交易、风控等多个领域有着广泛的应用,相信以后应用会越来越广,拭目以待吧。

五、参考

FAQ
Fully Distributed Deployment