总述
Complex Event Processing
在数据流中检测,是否满足一个或多个规则匹配
个体模式
个体模式可以匹配接收一个事件,在增加【量词】的情况下也可以匹配接收多个事件。
- 单例模式:接收一个事件就是 map.get(“first”).get(0)
循环模式:接收多个事件就是 map.get(“first”).get(N)
连接词
begin
-
条件
限定数据流的class类型:
pattern.subtype(SubEvent.class);- 简单条件:Pattern.where(New SimpleCondition)
- 组合条件:Pattern.where.().or()
- 终止条件:只能oneOrMore后使用
Pattern.begin().oneOrMore().until()- 遇到某个事件,就停止匹配。
- 可以清理之前匹配保存的状态 释放状态、释放内存
迭代条件: 在循环模式下使用,有context属性
- 简单条件只能基于当前事件做判断,迭代条件就是需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件
- 举个栗子:事件的 user 必须以 A 开 头;并且循环匹配的所有事件 amount 之和必须小于 100
// 迭代条件demo //在循环模式下使用Pattern.begin().where().oneOrMore().where(new IterativeCondition<Event>() {@Overridepublic boolean filter(Event value, Context<Event> ctx) throws Exception {if (!value.user.startsWith("A")) {return false;}int sum = value.amount;// 获取当前模式之前已经匹配的事件,求所有事件 amount 之和for (Event event : ctx.getEventsForPattern("middle")) {sum += event.amount;}// 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功return sum < 100;}});
限定个数
start.times(4) 4次 : times是宽松近邻,中间可以有其他事件
- start.times(2,4) 2-4次均可
- start.times(4).optional 0次或4次
- start.times(2,4).optional 234次或者0次
- start.oneOrMore 1次或多次
-
组合模式
限定顺序、时间
严格近邻next: a next b, [a, c, b1, b2] 匹配不到
- 多个严格近邻都是一样的规则 可以写成 times(3).consecutive()
- 宽松近邻followedBy:a followedBy b, [a, c, b1, b2] 匹配为 {a, b1}
- 多个宽松近邻都是一样的规则 可以写成 times(3)
- 非确定性宽松近邻followedByAny : [a, c, b1, b2] 匹配为 {a, b1},{a, b2}
- 更宽松 可以使用已经匹配过的事件
- 多个非确定性近邻都是一样的规则,也想使用已匹配的事件 可以写成 times(3).allowCombinations()
- notNext
- notFollowedBy
- 多长时间内匹配有效:next.within(Time.second(10))
使用匹配规则
- CEP.pattern() 返回patternStream
// 匹配模式的应用:匹配模式是用在同一个userID上 所以keyByPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), parttern);
筛选出符合匹配规则的数据
两类方法:
- PatternSelectFunction、PatternFlatSelectFunction
PatternProcessFunction(能访问context)
****************方法一:PatternSelectFunctionSingleOutputStreamOperator<LoginFailWarning> resultStream = patternStream.select(new LoginFailMatchDetectWarning());//Key就是每个模式的名称,而value就是所有接收到的事件的List类型,// 单例模式就是get(0)// 循环模式是get(0) get(1) get(2)public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{@Overridepublic LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {LoginEvent firstFail = pattern.get("firstFail").iterator().next();LoginEvent secondFail = pattern.get("secondFail").get(0); //上一行的方式 和本行的get(0)都可以return new LoginFailWarning(firstFail.getUserId(),firstFail.getTimestamp(),secondFail.getTimestamp(),"检测到连续两次登录失败");}}*****************方法二:PatternProcessFunction// 将匹配到的复杂事件选择出来,然后包装成报警信息输出patternStream.process(new PatternProcessFunction<LoginEvent, String>() {@Overridepublic void processMatch(Map<String, List<LoginEvent>> map, Context ctx,Collector<String> out) throws Exception {LoginEvent first = map.get("fails").get(0);LoginEvent second = map.get("fails").get(1);LoginEvent third = map.get("fails").get(2);out.collect(first.userId + "连续三次登录失败!登录时间:" + first.timestamp + "," + second.timestamp + "," + third.timestamp);}}).print("warning");
超时事件的处理:有begin没有后续
当指定了within的时间范围,可能部分数据有begin 但是超过了指定时间范围 这就是超时事件。两类方法:
使用PatternProcessFunction的侧输出流 + 实现TimedOutPartialMatchHandler接口 重写processTimedOutMatch方法:由一个OutputTag定义接收到的超时事件序列
旧方法:使用.select()传入三个参数 ```java public class OrderTimeoutDetectExample { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 获取订单事件流,并提取时间戳、生成水位线KeyedStream<OrderEvent, String> stream = env.fromElements(new OrderEvent("user_1", "order_1", "create", 1000L),new OrderEvent("user_2", "order_2", "create", 2000L),new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {@Overridepublic long extractTimestamp(OrderEvent event, long l) {return event.timestamp;}})).keyBy(order -> order.orderId); // 按照订单ID分组
// 1. 定义PatternPattern<OrderEvent, ?> pattern = Pattern.<OrderEvent>begin("create") // 首先是下单事件.where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).followedBy("pay") // 之后是支付事件;中间可以修改订单,宽松近邻.where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.minutes(15)); // 限制在15分钟之内// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<OrderEvent> patternStream = CEP.pattern(stream, pattern);// 3. 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出SingleOutputStreamOperator<String> payedOrderStream = patternStream.process(new OrderPayPatternProcessFunction());// 4. 定义一个测输出流标签,用于标识超时测输出流OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {};// 5. 将正常匹配和超时部分匹配的处理结果流打印输出payedOrderStream.print("payed");payedOrderStream.getSideOutput(timeoutTag).print("timeout");env.execute();}// 实现自定义的PatternProcessFunction,需实现TimedOutPartialMatchHandler接口public static class OrderPayPatternProcessFunction extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {// 处理正常匹配事件@Overridepublic void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {OrderEvent payEvent = match.get("pay").get(0);out.collect("订单 " + payEvent.orderId + " 已支付!");}// 处理超时未支付事件 new OutputTag<String>("timeout"){} 相同的输出流标签"timeout"即可@Overridepublic void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {OrderEvent createEvent = match.get("create").get(0);ctx.output(new OutputTag<String>("timeout"){}, "订单 " + createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);}}
}
```sql
OutputTag<OrderResult> outputTag = new OutputTag<OrderResult>("order-timeout") {};
SingleOutputStreamOperator<OrderResult> resultStream = patternStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
// 自定义超时事件处理
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent, OrderResult>{
@Override
public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
Long timeoutOrderId = pattern.get("create").iterator().next().getOrderId();
return new OrderResult(timeoutOrderId, "timeout" + timeoutTimestamp);
}
}
//自定义正常匹配事件处理
public static class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult>{
@Override
public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
Long payedOrderId = pattern.get("pay").iterator().next().getOrderId();
return new OrderResult(payedOrderId, "payed");
}
}
CEP中迟到数据的处理
watermark延迟内的数据
如果是process time那不会有watermark,不会有延迟情况,因为粗暴的以数据到达时间为先后顺序处理。
如果是event time可能 时间早的数据 晚到,
- 数据来了之后先缓存,根据eventtime进行从小到达排序,当一个水位线到达时 会把小于当前watermark的数据进行匹配。watermark的延迟时间就是数据的最大等待时间
- 举栗子:3秒延迟 则10秒数据来之后触发7秒这个水位线。把7秒之前的数据进行排序匹配。 8秒数据则还需缓存3秒延迟 等待8秒这个watermark到来。
比watermark还延迟的数据
侧输出流输出 ```sql PatternStreampatternStream = CEP.pattern(stream, pattern); // 定义一个侧输出流的标签 OutputTag lateDataOutputTag = new OutputTag (“late-data”){};
SingleOutputStreamOperator
// 从结果中提取侧输出流
DataStream
