在之前的文章中介绍了FlinkCEP的基本使用方法,本文将介绍flink提供的sql方式实现模式匹配,即Detecting Patterns in Tables.
完整样例
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.formats.csv.CsvRowDeserializationSchema;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.Properties;public class FlinkCEPSqlExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);final TableSchema tableSchema = new TableSchema(new String[]{"symbol","tax","price", "rowtime"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.SQL_TIMESTAMP});final TypeInformation<Row> typeInfo = tableSchema.toRowType();final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');Properties properties = new Properties();properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010<>("foo",deserSchemaBuilder.build(),properties);myConsumer.setStartFromLatest();DataStream<Row> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());tableEnv.registerDataStream("Ticker", stream, "symbol,tax,price,rowtime.rowtime");Table result = tableEnv.sqlQuery("SELECT * " +"FROM Ticker " +" MATCH_RECOGNIZE( " +" PARTITION BY symbol " +" ORDER BY rowtime " +" MEASURES " +" A.price AS firstPrice, " +" B.price AS lastPrice " +" ONE ROW PER MATCH " +" AFTER MATCH SKIP PAST LAST ROW " +" PATTERN (A+ B) " +" DEFINE " +" A AS A.price < 10, " +" B AS B.price > 100 " +" )");final TableSchema tableSchemaResult = new TableSchema(new String[]{"symbol","firstPrice","lastPrice"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.LONG});final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();DataStream ds = tableEnv.toAppendStream(result, typeInfoResult);ds.print();env.execute("Flink CEP via SQL example");}private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Row> {private final long maxOutOfOrderness = 5000;private long currentMaxTimestamp;@Overridepublic long extractTimestamp(Row row, long previousElementTimestamp) {System.out.println("Row is " + row);long timestamp = StringUtilsPlus.dateToStamp(String.valueOf(row.getField(3)));currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));return timestamp;}@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}}
样例说明
PARTITION BY symbol根据symbol字段进行逻辑分区ORDER BY rowtime根据事件时间进行排序,避免时序混乱MEASURES里面定义了要输出的字段ONE ROW PER MATCH为输出模式,另一种模式为ALL ROWS PER MATCH,但是目前flink1.9中只支持ONE ROW PER MATCHAFTER MATCH SKIP PAST LAST ROW匹配后的事件丢弃策略,这种策略保证每个事件最多匹配一次
- SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
- SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
- SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
- SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.
PATTERN (A+ B)定义了两个模式的关系,如果是(A B)表示两个事件必须第一个事件满足A的条件,第二个事件满足B的条件。(A+ B)的含义就是在满足B的条件之前,可以有1个或多个事件满足A的条件。具体规则参见Defining a PatternDEFINE中定义了具体的每个模式的规则- 虽然是以sql的形式运行,但是最终内部的运行逻辑和《Flink CEP进阶》中介绍的是一致的,只是flink对SQL进行了解析
总结
本文提供了一个简单的FlinkCEP SQL样例,可以让读者快速体验FlinkCEP SQL的运行效果,本例的业务含义可以理解为,连续小额交易后突然有一笔大额交易即输出结果。
