在之前的文章中介绍了FlinkCEP的基本使用方法,本文将介绍flink提供的sql方式实现模式匹配,即Detecting Patterns in Tables.

完整样例

代码传送门

  1. import org.apache.flink.api.common.typeinfo.TypeInformation;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
  4. import org.apache.flink.streaming.api.TimeCharacteristic;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  8. import org.apache.flink.streaming.api.watermark.Watermark;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  10. import org.apache.flink.table.api.Table;
  11. import org.apache.flink.table.api.TableSchema;
  12. import org.apache.flink.table.api.java.StreamTableEnvironment;
  13. import org.apache.flink.types.Row;
  14. import java.util.Properties;
  15. public class FlinkCEPSqlExample {
  16. public static void main(String[] args) throws Exception {
  17. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  19. env.setParallelism(1);
  20. final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  21. final TableSchema tableSchema = new TableSchema(new String[]{"symbol","tax","price", "rowtime"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.SQL_TIMESTAMP});
  22. final TypeInformation<Row> typeInfo = tableSchema.toRowType();
  23. final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');
  24. Properties properties = new Properties();
  25. properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
  26. FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010<>(
  27. "foo",
  28. deserSchemaBuilder.build(),
  29. properties);
  30. myConsumer.setStartFromLatest();
  31. DataStream<Row> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
  32. tableEnv.registerDataStream("Ticker", stream, "symbol,tax,price,rowtime.rowtime");
  33. Table result = tableEnv.sqlQuery("SELECT * " +
  34. "FROM Ticker " +
  35. " MATCH_RECOGNIZE( " +
  36. " PARTITION BY symbol " +
  37. " ORDER BY rowtime " +
  38. " MEASURES " +
  39. " A.price AS firstPrice, " +
  40. " B.price AS lastPrice " +
  41. " ONE ROW PER MATCH " +
  42. " AFTER MATCH SKIP PAST LAST ROW " +
  43. " PATTERN (A+ B) " +
  44. " DEFINE " +
  45. " A AS A.price < 10, " +
  46. " B AS B.price > 100 " +
  47. " )");
  48. final TableSchema tableSchemaResult = new TableSchema(new String[]{"symbol","firstPrice","lastPrice"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.LONG});
  49. final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();
  50. DataStream ds = tableEnv.toAppendStream(result, typeInfoResult);
  51. ds.print();
  52. env.execute("Flink CEP via SQL example");
  53. }
  54. private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Row> {
  55. private final long maxOutOfOrderness = 5000;
  56. private long currentMaxTimestamp;
  57. @Override
  58. public long extractTimestamp(Row row, long previousElementTimestamp) {
  59. System.out.println("Row is " + row);
  60. long timestamp = StringUtilsPlus.dateToStamp(String.valueOf(row.getField(3)));
  61. currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  62. System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
  63. return timestamp;
  64. }
  65. @Override
  66. public Watermark getCurrentWatermark() {
  67. return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  68. }
  69. }
  70. }

样例说明

  1. PARTITION BY symbol根据symbol字段进行逻辑分区
  2. ORDER BY rowtime根据事件时间进行排序,避免时序混乱
  3. MEASURES里面定义了要输出的字段
  4. ONE ROW PER MATCH为输出模式,另一种模式为ALL ROWS PER MATCH,但是目前flink1.9中只支持ONE ROW PER MATCH
  5. AFTER MATCH SKIP PAST LAST ROW匹配后的事件丢弃策略,这种策略保证每个事件最多匹配一次
  • SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
  • SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
  • SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
  • SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.
  1. PATTERN (A+ B)定义了两个模式的关系,如果是(A B)表示两个事件必须第一个事件满足A的条件,第二个事件满足B的条件。(A+ B)的含义就是在满足B的条件之前,可以有1个或多个事件满足A的条件。具体规则参见Defining a Pattern
  2. DEFINE中定义了具体的每个模式的规则
  3. 虽然是以sql的形式运行,但是最终内部的运行逻辑和《Flink CEP进阶》中介绍的是一致的,只是flink对SQL进行了解析

总结

本文提供了一个简单的FlinkCEP SQL样例,可以让读者快速体验FlinkCEP SQL的运行效果,本例的业务含义可以理解为,连续小额交易后突然有一笔大额交易即输出结果。