总述

Complex Event Processing
在数据流中检测,是否满足一个或多个规则匹配

个体模式

个体模式可以匹配接收一个事件,在增加【量词】的情况下也可以匹配接收多个事件。

  • 单例模式:接收一个事件就是 map.get(“first”).get(0)
  • 循环模式:接收多个事件就是 map.get(“first”).get(N)

    连接词

  • begin

  • next

    条件

  • 限定数据流的class类型:pattern.subtype(SubEvent.class);

  • 简单条件:Pattern.where(New SimpleCondition)
  • 组合条件:Pattern.where.().or()
  • 终止条件:只能oneOrMore后使用 Pattern.begin().oneOrMore().until()
    • 遇到某个事件,就停止匹配。
    • 可以清理之前匹配保存的状态 释放状态、释放内存
  • 迭代条件: 在循环模式下使用,有context属性

    • 简单条件只能基于当前事件做判断,迭代条件就是需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件
    • 举个栗子:事件的 user 必须以 A 开 头;并且循环匹配的所有事件 amount 之和必须小于 100
      1. // 迭代条件demo //在循环模式下使用
      2. Pattern
      3. .begin()
      4. .where()
      5. .oneOrMore()
      6. .where(new IterativeCondition<Event>() {
      7. @Override
      8. public boolean filter(Event value, Context<Event> ctx) throws Exception {
      9. if (!value.user.startsWith("A")) {
      10. return false;
      11. }
      12. int sum = value.amount;
      13. // 获取当前模式之前已经匹配的事件,求所有事件 amount 之和
      14. for (Event event : ctx.getEventsForPattern("middle")) {
      15. sum += event.amount;
      16. }
      17. // 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功
      18. return sum < 100;
      19. }
      20. });

      限定个数

  • start.times(4) 4次 : times是宽松近邻,中间可以有其他事件

  • start.times(2,4) 2-4次均可
  • start.times(4).optional 0次或4次
  • start.times(2,4).optional 234次或者0次
  • start.oneOrMore 1次或多次
  • optional()

    组合模式

    把个体模式连接起来就是组合模式

    限定顺序、时间

  • 严格近邻next: a next b, [a, c, b1, b2] 匹配不到

    • 多个严格近邻都是一样的规则 可以写成 times(3).consecutive()
  • 宽松近邻followedBy:a followedBy b, [a, c, b1, b2] 匹配为 {a, b1}
    • 多个宽松近邻都是一样的规则 可以写成 times(3)
  • 非确定性宽松近邻followedByAny : [a, c, b1, b2] 匹配为 {a, b1},{a, b2}
    • 更宽松 可以使用已经匹配过的事件
    • 多个非确定性近邻都是一样的规则,也想使用已匹配的事件 可以写成 times(3).allowCombinations()
  • notNext
  • notFollowedBy
  • 多长时间内匹配有效:next.within(Time.second(10))

使用匹配规则

  • CEP.pattern() 返回patternStream
    1. // 匹配模式的应用:匹配模式是用在同一个userID上 所以keyBy
    2. PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), parttern);

    筛选出符合匹配规则的数据

    两类方法:
  1. PatternSelectFunction、PatternFlatSelectFunction
  2. PatternProcessFunction(能访问context)

    1. ****************方法一:PatternSelectFunction
    2. SingleOutputStreamOperator<LoginFailWarning> resultStream = patternStream.select(new LoginFailMatchDetectWarning());
    3. //Key就是每个模式的名称,而value就是所有接收到的事件的List类型,
    4. // 单例模式就是get(0)
    5. // 循环模式是get(0) get(1) get(2)
    6. public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
    7. @Override
    8. public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
    9. LoginEvent firstFail = pattern.get("firstFail").iterator().next();
    10. LoginEvent secondFail = pattern.get("secondFail").get(0); //上一行的方式 和本行的get(0)都可以
    11. return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(),secondFail.getTimestamp(),"检测到连续两次登录失败");
    12. }
    13. }
    14. *****************方法二:PatternProcessFunction
    15. // 将匹配到的复杂事件选择出来,然后包装成报警信息输出
    16. patternStream
    17. .process(new PatternProcessFunction<LoginEvent, String>() {
    18. @Override
    19. public void processMatch(Map<String, List<LoginEvent>> map, Context ctx,Collector<String> out) throws Exception {
    20. LoginEvent first = map.get("fails").get(0);
    21. LoginEvent second = map.get("fails").get(1);
    22. LoginEvent third = map.get("fails").get(2);
    23. out.collect(first.userId + "连续三次登录失败!登录时间:" + first.timestamp + "," + second.timestamp + "," + third.timestamp);
    24. }
    25. })
    26. .print("warning");

    超时事件的处理:有begin没有后续

    当指定了within的时间范围,可能部分数据有begin 但是超过了指定时间范围 这就是超时事件。两类方法:

  3. 使用PatternProcessFunction的侧输出流 + 实现TimedOutPartialMatchHandler接口 重写processTimedOutMatch方法:由一个OutputTag定义接收到的超时事件序列

  4. 旧方法:使用.select()传入三个参数 ```java public class OrderTimeoutDetectExample { public static void main(String[] args) throws Exception {

    1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. env.setParallelism(1);
    3. // 获取订单事件流,并提取时间戳、生成水位线
    4. KeyedStream<OrderEvent, String> stream = env
    5. .fromElements(
    6. new OrderEvent("user_1", "order_1", "create", 1000L),
    7. new OrderEvent("user_2", "order_2", "create", 2000L),
    8. new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
    9. new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
    10. new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
    11. new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
    12. )
    13. .assignTimestampsAndWatermarks(
    14. WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
    15. .withTimestampAssigner(
    16. new SerializableTimestampAssigner<OrderEvent>() {
    17. @Override
    18. public long extractTimestamp(OrderEvent event, long l) {
    19. return event.timestamp;
    20. }
    21. }
    22. )
    23. )
    24. .keyBy(order -> order.orderId); // 按照订单ID分组
  1. // 1. 定义Pattern
  2. Pattern<OrderEvent, ?> pattern = Pattern
  3. .<OrderEvent>begin("create") // 首先是下单事件
  4. .where(new SimpleCondition<OrderEvent>() {
  5. @Override
  6. public boolean filter(OrderEvent value) throws Exception {
  7. return value.eventType.equals("create");
  8. }
  9. })
  10. .followedBy("pay") // 之后是支付事件;中间可以修改订单,宽松近邻
  11. .where(new SimpleCondition<OrderEvent>() {
  12. @Override
  13. public boolean filter(OrderEvent value) throws Exception {
  14. return value.eventType.equals("pay");
  15. }
  16. })
  17. .within(Time.minutes(15)); // 限制在15分钟之内
  18. // 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
  19. PatternStream<OrderEvent> patternStream = CEP.pattern(stream, pattern);
  20. // 3. 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
  21. SingleOutputStreamOperator<String> payedOrderStream = patternStream.process(new OrderPayPatternProcessFunction());
  22. // 4. 定义一个测输出流标签,用于标识超时测输出流
  23. OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {};
  24. // 5. 将正常匹配和超时部分匹配的处理结果流打印输出
  25. payedOrderStream.print("payed");
  26. payedOrderStream.getSideOutput(timeoutTag).print("timeout");
  27. env.execute();
  28. }
  29. // 实现自定义的PatternProcessFunction,需实现TimedOutPartialMatchHandler接口
  30. public static class OrderPayPatternProcessFunction extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
  31. // 处理正常匹配事件
  32. @Override
  33. public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
  34. OrderEvent payEvent = match.get("pay").get(0);
  35. out.collect("订单 " + payEvent.orderId + " 已支付!");
  36. }
  37. // 处理超时未支付事件 new OutputTag<String>("timeout"){} 相同的输出流标签"timeout"即可
  38. @Override
  39. public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
  40. OrderEvent createEvent = match.get("create").get(0);
  41. ctx.output(new OutputTag<String>("timeout"){}, "订单 " + createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);
  42. }
  43. }

}

```sql
OutputTag<OrderResult> outputTag = new OutputTag<OrderResult>("order-timeout") {};

SingleOutputStreamOperator<OrderResult> resultStream = patternStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());

// 自定义超时事件处理
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent, OrderResult>{
    @Override
    public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
        Long timeoutOrderId = pattern.get("create").iterator().next().getOrderId();
        return new OrderResult(timeoutOrderId, "timeout" + timeoutTimestamp);
    }
}

//自定义正常匹配事件处理
public static class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult>{
    @Override
    public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
        Long payedOrderId = pattern.get("pay").iterator().next().getOrderId();
        return new OrderResult(payedOrderId, "payed");
    }
}

CEP中迟到数据的处理

watermark延迟内的数据

如果是process time那不会有watermark,不会有延迟情况,因为粗暴的以数据到达时间为先后顺序处理。
如果是event time可能 时间早的数据 晚到,

  • 数据来了之后先缓存,根据eventtime进行从小到达排序,当一个水位线到达时 会把小于当前watermark的数据进行匹配。watermark的延迟时间就是数据的最大等待时间
  • 举栗子:3秒延迟 则10秒数据来之后触发7秒这个水位线。把7秒之前的数据进行排序匹配。 8秒数据则还需缓存3秒延迟 等待8秒这个watermark到来。

    比watermark还延迟的数据

    侧输出流输出 ```sql PatternStream patternStream = CEP.pattern(stream, pattern); // 定义一个侧输出流的标签 OutputTag lateDataOutputTag = new OutputTag(“late-data”){};

SingleOutputStreamOperator result = patternStream // 将迟到数据输出到侧输出流 .sideOutputLateData(lateDataOutputTag) // 处理正常匹配数据 .select(new PatternSelectFunction() {…});

// 从结果中提取侧输出流 DataStream lateData = result.getSideOutput(lateDataOutputTag); ```