环境配置

依赖

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <flink.version>1.12.0</flink.version>
  5. </properties>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  10. <version>1.10.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-java</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-streaming-java_2.12</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-clients_2.12</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>com.ververica</groupId>
  29. <artifactId>flink-connector-oracle-cdc</artifactId>
  30. <version>2.1.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-table-planner-blink_2.12</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.alibaba</groupId>
  39. <artifactId>fastjson</artifactId>
  40. <version>1.2.75</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.hbase</groupId>
  44. <artifactId>hbase-client</artifactId>
  45. <version>2.1.5</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.kafka</groupId>
  49. <artifactId>kafka-clients</artifactId>
  50. <version>2.7.0</version>
  51. </dependency>
  52. </dependencies>
  53. <build>
  54. <plugins>
  55. <plugin>
  56. <groupId>org.apache.maven.plugins</groupId>
  57. <artifactId>maven-assembly-plugin</artifactId>
  58. <version>3.0.0</version>
  59. <configuration>
  60. <descriptorRefs>
  61. <descriptorRef>jar-with-dependencies</descriptorRef>
  62. </descriptorRefs>
  63. </configuration>
  64. <executions>
  65. <execution>
  66. <id>make-assembly</id>
  67. <phase>package</phase>
  68. <goals>
  69. <goal>single</goal>
  70. </goals>
  71. </execution>
  72. </executions>
  73. </plugin>
  74. </plugins>
  75. </build>

Flink基础

Source

SensorReading类

public class SensorReading {
    // 属性:id,时间戳,温度值
    private String id;
    private Long timestamp;
    private Double temperature;
}

从集合中读取数据

 public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:从集合读取数据
        DataStream<SensorReading> sensorDataStream = env.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_6", 1547718201L, 15.4),
                        new SensorReading("sensor_7", 1547718202L, 6.7),
                        new SensorReading("sensor_10", 1547718205L, 38.1)
                )
        );
        // 2.打印
        sensorDataStream.print();
        // 3.执行
        env.execute();
    }

从文件中读取

DataStream<String> dataStream = env.readTextFile("YOUR_FILE_PATH ");

从Kafka中读取

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  <version>1.10.1</version>
</dependency>
public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从文件读取数据
        DataStream<String> dataStream = env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

        // 打印输出
        dataStream.print();

        env.execute();
    }

自定义Source

DataStream<SensorReading> dataStream = env.addSource( new MySensor());
class MySensor implements SourceFunction<SensorReading> {
    private boolean running = true;

    public void run(SourceContext<SensorReading> ctx) throws Exception {
        Random random = new Random();
        HashMap<String, Double> sensorTempMap = new HashMap<String, Double>();
        for (int i = 0; i < 10; i++) {
            sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
        }
        while (running) {
            for (String sensorId : sensorTempMap.keySet()) {
                Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                sensorTempMap.put(sensorId, newTemp);
                ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(),
                        newTemp));
            }
            Thread.sleep(1000L);
        }
    }

    public void cancel() {
        this.running = false;
    }
}

Transform

map

 DataStream<Integer> mapStram = dataStream.map(new MapFunction<SensorReading, Integer>() {
            @Override
            public Integer map(SensorReading value) throws Exception {
                return value.getId().length();
            }
        });
 public static void main(String[] args) throws Exception {
        //1. 获取Oracle数据
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("172.18.1.157")
                .port(1521)
                .database("helowin")
                .schemaList("test")
                .tableList("test.bqh10")
                .username("test")
                .password("test")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stringDataStreamSource = env.addSource(sourceFunction).setParallelism(1);

        //2,格式化输出流
        DataStream<String> dataStream = (DataStream<String>) stringDataStreamSource.map(line -> {
            JSONObject data = JSON.parseObject(line);
            String A = (String) JSON.parseObject(String.valueOf(data.get("after"))).get("A");
            String B = (String) JSON.parseObject(String.valueOf(data.get("after"))).get("B");
            return new BQH10(A,B).toString();
        });
        dataStream.print().setParallelism(1);
        //写入kafka
        dataStream.addSink( new FlinkKafkaProducer011<String>("hadoop102:9092", "sinktest", new SimpleStringSchema()));
        env.execute();
    }

flatMap

按照逗号分隔把csv文件的每一个cell提取出来

DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, 
String>() {
 public void flatMap(String value, Collector<String> out) throws Exception {
 String[] fields = value.split(","); 
 for( String field: fields )
 out.collect(field);
 }
});

Filter

DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>() 
{
 public boolean filter(String value) throws Exception {
 return value == 1;
 }
});

KeyBy、Reduce

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:从集合读取数据
        DataStream<SensorReading> dataStream = env.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_2", 1547718201L, 15.4),
                        new SensorReading("sensor_3", 1547718202L, 6.7),
                        new SensorReading("sensor_2", 1547718205L, 38.1),
                        new SensorReading("sensor_1", 1547718206L, 38.1)
                )
        ).setParallelism(1);
        dataStream.keyBy("id").reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(),
                        value2.getTimestamp(),
                        Math.min(value1.getTemperature(), value1.getTemperature()));
            }
        }).print().setParallelism(1);
        // 3.执行
        env.execute();
    }

Split、Select 分流查询

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:从集合读取数据
        DataStream<SensorReading> dataStream = env.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_2", 1547718201L, 15.4),
                        new SensorReading("sensor_3", 1547718202L, 6.7),
                        new SensorReading("sensor_2", 1547718205L, 38.1),
                        new SensorReading("sensor_1", 1547718206L, 38.1)
                )
        ).setParallelism(1);
        SplitStream<SensorReading> splitStream=dataStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
                return sensorReading.getTemperature()>20? Collections.singletonList("high"):Collections.singletonList("low");
            }
        });
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
        highTempStream.print();
        // 3.执行
        env.execute();
    }

Connect、CoMap合并流格式化输出

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.Source:从集合读取数据
        DataStream<SensorReading> dataStream = env.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_2", 1547718201L, 15.4),
                        new SensorReading("sensor_3", 1547718202L, 6.7),
                        new SensorReading("sensor_2", 1547718205L, 38.1),
                        new SensorReading("sensor_1", 1547718206L, 38.1)
                )
        ).setParallelism(1);
        SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
                return sensorReading.getTemperature() > 20 ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
        highTempStream.print();

        // 合流 connect
        DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
                   @Override
                   public Tuple2<String, Double> map(SensorReading value) throws Exception {
                       return new Tuple2<>(value.getId(), value.getTemperature());
                   }
        });
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
        //CoMapFunction<IN1, IN2, OUT>
        DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "warning");
            }
            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "healthy");
            }
        });
        resultStream.print();
        // 3.执行
        env.execute();
    }

Union

DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);

1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap
中再去调整成为一样的。
2. Connect 只能操作两个流,Union 可以操作多个。

UDF

Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如 MapFunction, FilterFunction, ProcessFunction 等等。

DataStream<String> flinkTweets = tweets.filter(new FlinkFilter());
  public static class FlinkFilter implements FilterFunction<String> {
     @Override
     public boolean filter(String value) throws Exception {
     return value.contains("flink");
   } 
 }

富函数

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都
有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一
些生命周期方法,所以可以实现更复杂的功能。
⚫ RichMapFunction
⚫ RichFlatMapFunction
⚫ RichFilterFunction
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
⚫ open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter
被调用之前 open()会被调用。
⚫ close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
⚫ getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函
数执行的并行度,任务的名字,以及 state 状态

public static class MyMapFunction extends RichMapFunction<SensorReading, 
Tuple2<Integer, String>> {
   @Override
   public Tuple2<Integer, String> map(SensorReading value) throws Exception {
   return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), 
  value.getId());
   }
   @Override
   public void open(Configuration parameters) throws Exception {
   System.out.println("my map open");
   // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接
   }
   @Override
   public void close() throws Exception {
   System.out.println("my map close");
   // 以下做一些清理工作,例如断开和 HDFS 的连接
   } 
}

Sink

kafka

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
 <version>1.10.1</version>
</dependency>
dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", 
"test", new SimpleStringSchema()))

自定义sink

mysql

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.44</version>
</dependency>
public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
    Connection conn = null;
    PreparedStatement insertStmt = null;
    PreparedStatement updateStmt = null;
    // open 主要是创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test",
                "root", "123456");
        // 创建预编译器,有占位符,可传入参数
        insertStmt = conn.prepareStatement("INSERT INTO sensor_temp (id, temp) VALUES 
        (?, ?)");
        updateStmt = conn.prepareStatement("UPDATE sensor_temp SET temp = ? WHERE id 
                = ?");
    }
    // 调用连接,执行 sql
    @Override
    public void invoke(SensorReading value, Context context) throws Exception {
        // 执行更新语句,注意不要留 super
        updateStmt.setDouble(1, value.getTemperature());
        updateStmt.setString(2, value.getId());
        updateStmt.execute();
        // 如果刚才 update 语句没有更新,那么插入
        if (updateStmt.getUpdateCount() == 0) {
            insertStmt.setString(1, value.getId());
            insertStmt.setDouble(2, value.getTemperature());
            insertStmt.execute();
        }
    }
    @Override
    public void close() throws Exception {
        insertStmt.close();
        updateStmt.close();
        conn.close();
    } 
}

Hbase

class HBaseSink extends RichSinkFunction<BQH10> {
    private org.apache.hadoop.conf.Configuration configuration; //上下文环境
    private org.apache.hadoop.hbase.client.Connection connection = null;  //连接到hbase
    private BufferedMutator userMutator;
    private int count=0;

    @Override//初始化工作,建立与数据库的连接
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        configuration = HBaseConfiguration.create();
//        configuration.set("hbase.master", "172.18.1.114:60020");
        configuration.set("hbase.zookeeper.quorum", "172.18.1.114,172.18.1.116,172.18.1.117");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(configuration);
            System.out.println(connection);
        } catch (IOException e) {
            e.printStackTrace();
        }
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("bqh10"));
        params.writeBufferSize(1024 * 1024);
        userMutator = connection.getBufferedMutator(params);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //todo 关闭连接
        if (userMutator!=null) {
            userMutator.flush();
            userMutator.close();
        }
        if (connection!=null) {
            connection.close();
        }
    }

    @Override
    public void invoke(BQH10 value, Context context) throws Exception {
        String RowKey= value.getA();
        String Key="B";
        String Value = value.getB();
//        System.out.println("Column Family=f1,  RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
        Put put = new Put(RowKey.getBytes());
        put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
        userMutator.mutate(put);
        if (count >= 500){
            userMutator.flush();
            System.out.println("写入成功");
            count = 0;
        }
        count = count + 1;
    }
}

window

Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling
Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

TimeWindow

滚动窗口

DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
   .map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
 @Override
 public Tuple2<String, Double> map(SensorReading value) throws 
Exception {
 return new Tuple2<>(value.getId(), value.getTemperature());
 }
 })
 .keyBy(data -> data.f0) 
 .timeWindow( Time.seconds(15) )
 .minBy(1);

滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,
每一次计算的 window 范围是 15s 内的所有元素。

DataStream<SensorReading> minTempPerWindowStream = dataStream
 .keyBy(SensorReading::getId) 
 .timeWindow( Time.seconds(15), Time.seconds(5) )
 .minBy("temperature");

CountWindow

  1. 滚动窗口

    DataStream<SensorReading> minTempPerWindowStream = dataStream
    .keyBy(SensorReading::getId)
    .countWindow( 5 )
    .minBy("temperature");
    
  2. 滑动窗口

    DataStream<SensorReading> minTempPerWindowStream = dataStream
    .keyBy(SensorReading::getId)
    .countWindow( 10,2 )
    .minBy("temperature");
    

    分组窗口

    分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。

    Table table = input
    .window([w: GroupWindow] as "w") // 定义窗口,别名 w
    .groupBy("w, a") // 以属性 a 和窗口 w 作为分组的 key
    .select("a, b.sum") // 聚合字段 b 的值,求和
    或者,还可以把窗口的相关信息,作为字段添加到结果表中:
    Table table = input
    .window([w: GroupWindow] as "w") 
    .groupBy("w, a") 
    .select("a, w.start, w.end, w.rowtime, b.count")
    

    滚动窗口

    滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法:
    ⚫ over:定义窗口长度
    ⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    ⚫ as:别名,必须出现在后面的 groupBy 中

    // Tumbling Event-time Window
    .window(Tumble.over("10.minutes").on("rowtime").as("w"))
    // Tumbling Processing-time Window
    .window(Tumble.over("10.minutes").on("proctime").as("w"))
    // Tumbling Row-count Window
    .window(Tumble.over("10.rows").on("proctime").as("w"))
    

    滑动窗口

    滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法:
    ⚫ over:定义窗口长度
    ⚫ every:定义滑动步长
    ⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    ⚫ as:别名,必须出现在后面的 groupBy 中

    // Sliding Event-time Window
    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
    // Sliding Processing-time window 
    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
    // Sliding Row-count window
    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
    

    会话窗口

    会话窗口(Session windows)要用 Session 类来定义,另外还有三个方法:
    ⚫ withGap:会话时间间隔
    ⚫ on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    ⚫ as:别名,必须出现在后面的 groupBy 中

    // Session Event-time Window
    .window(Session.withGap.("10.minutes").on("rowtime").as("w"))
    // Session Processing-time Window
    .window(Session.withGap.("10.minutes").on(“proctime").as("w"))
    

时间语义与 Wartermark

Wartermark相当于将数据延迟处理的时间
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的 日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器
相关,默认的时间属性就是 Processing Time。

EventTime

在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所 示:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Watermark 的引入

能大致估算出数据流中的事件的最大延迟时间,可以用以下代码

//水位线插入时间1s
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000L)) {
     @Override //时间里面带有时间戳
     public long extractTimestamp(SensorReading element) {
       return element.getTimestamp()*1000;
     }
});

Event Time 的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事 件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。 我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就 是分配时间戳的接口。Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<SensorReading> dataStream = env.addSource(new SensorSource())
 .assignTimestampsAndWatermarks(new MyAssigner());

自定义从事件中提取时间戳

Assigner with periodic watermarks

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也
是一种特殊的事件!)。默认周期是 200 毫秒。可以使用

ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

// 每隔 5 秒产生一个 watermark 
env.getConfig.setAutoWatermarkInterval(5000);

产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用
AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了
水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark。 例子,自定义一个周期性的时间戳抽取:

// 自定义周期性时间戳分配器
public static class MyPeriodicAssigner implements
        AssignerWithPeriodicWatermarks<SensorReading> {

    private Long bound = 60 * 1000L; // 延迟一分钟
    private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(maxTs - bound);
    }

    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        maxTs = Math.max(maxTs, element.getTimestamp());
        return element.getTimestamp();
    }
}

一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用 AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成 watermark。

DataStream<SensorReading> dataStream = …
dataStream.assignTimestampsAndWatermarks(
  new AscendingTimestampExtractor<SensorReading>() {
   @Override
   public long extractAscendingTimestamp(SensorReading element) {
     return element.getTimestamp() * 1000;
   }
});

Assigner with punctuated watermarks

间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的,
而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给
sensor_1 的传感器的数据流插入 watermark:

public static class MyPunctuatedAssigner implements
        AssignerWithPunctuatedWatermarks<SensorReading> {
    private Long bound = 60 * 1000L; // 延迟一分钟

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(SensorReading lastElement, long
            extractedTimestamp) {
        if (lastElement.getId().equals("sensor_1"))
            return new Watermark(extractedTimestamp - bound);
        else
            return null;
    }

    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        return element.getTimestamp();
    }
}

Table API与SQL

引入依赖

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner_2.12</artifactId>
 <version>1.10.1</version>
</dependency> <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
 <version>1.10.1</version>
</dependency>

csv环境

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-csv</artifactId>
 <version>1.10.1</version>
</dependency>

source

连接外部文件

 StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接
                .withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法
                .withSchema( new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temperature", DataTypes.DOUBLE())
                ) // 定义表结构
                .createTemporaryTable("inputTable"); // 创建临时表

连接Kafka

tableEnv.connect(new Kafka()
                        .version("0.11") // 定义 kafka 的版本
                        .topic("sensor") // 定义主题
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092") )
        .withFormat(new Csv())
        .withSchema(new Schema()
                .field("id", DataTypes.STRING())
                .field("timestamp", DataTypes.BIGINT())
                .field("temperature", DataTypes.DOUBLE())
        )
        .createTemporaryTable("kafkaInputTable");

fromDataStream

DataStream<String> inputStream = env.readTextFile("sensor.txt");
        DataStream<SensorReading> dataStream = inputStream
                .map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0], new Long(fields[1]), new
                            Double(fields[2]));
                });
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

创建临时视图

  1. 基于数据流

    tableEnv.createTemporaryView("sensorView", dataStream);
    tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, 
    timestamp as ts");
    
  2. 基于Table

    tableEnv.createTemporaryView("sensorView", sensorTable);
    

    sink

    输出到文件

    // 注册输出表
    tableEnv.connect(
    new FileSystem().path("…\\resources\\out.txt")
    ) // 定义到文件系统的连接
    .withFormat(new Csv()) // 定义格式化方法,Csv 格式
    .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("temp", DataTypes.DOUBLE())
    ) // 定义表结构
    .createTemporaryTable("outputTable"); // 创建临时表
    resultSqlTable.insertInto("outputTable");
    

    在流处理过程中,表的处理并不像传统定义的那样简单。 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。 Flink Table API 中的更新模式有以下三种:
    1)追加模式(Append Mode)
    在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。
    2)撤回模式(Retract Mode)
    在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
    ⚫ 插入(Insert)会被编码为添加消息;
    ⚫ 删除(Delete)则编码为撤回消息;
    ⚫ 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)
    的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。
    3)Upsert(更新插入)模式
    在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息, 外部连接器需要知道这个唯一 key 的属性。
    ⚫ 插入(Insert)和更新(Update)都被编码为 Upsert 消息;
    ⚫ 删除(Delete)编码为 Delete 信息。
    这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率会更高。

    输出到kafka

    // 输出到 kafka
    tableEnv.connect(
                 new Kafka()
                         .version("0.11")
                         .topic("sinkTest")
                         .property("zookeeper.connect", "localhost:2181")
                         .property("bootstrap.servers", "localhost:9092"))
         .withFormat(new Csv())
         .withSchema(new Schema()
                 .field("id", DataTypes.STRING())
                 .field("temp", DataTypes.DOUBLE())
         )
         .createTemporaryTable("kafkaOutputTable");
    //resultTable为准备插入的数据
    resultTable.insertInto("kafkaOutputTable");
    

    输出到Mysql

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.12</artifactId>
    <version>1.10.1</version>
    </dependency>
    
    // 输出到 Mysql
    String sinkDDL= "create table jdbcOutputTable (" +
    " id varchar(20) not null, " +
    " cnt bigint not null " +
    ") with (" +
    " 'connector.type' = 'jdbc', " +
    " 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
    " 'connector.table' = 'sensor_count', " +
    " 'connector.driver' = 'com.mysql.jdbc.Driver', " +
    " 'connector.username' = 'root', " +
    " 'connector.password' = '123456' )";
    tableEnv.sqlUpdate(sinkDDL); // 执行 DDL 创建表
    aggResultSqlTable.insertInto("jdbcOutputTable");
    

    将表转换成DataStream

    表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。 将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。
    Table API 中表到 DataStream 有两种模式:
    ⚫ 追加模式(Append Mode)
    用于表只会被插入(Insert)操作更改的场景。
    ⚫ 撤回模式(Retract Mode)
    用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
    得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底 是新增的数据(Insert),还是被删除的数据(老数据, Delete)。

    DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, 
    Row.class);
    DataStream<Tuple2<Boolean, Row>> aggResultStream = 
    tableEnv.toRetractStream(aggResultTable, Row.class);
    resultStream.print("result");
    aggResultStream.print("aggResult");
    

    所以,没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream。

    时间特性

    基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。时间属性的行为类似于常规时间戳,可以访问,并且进行计算。

    处理时间

    处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概
    念。它既不需要提取时间戳,也不需要生成 watermark。定义处理时间属性有三种方法:在 DataStream 转化时直接指定;在定义 Table Schema时指定;在创建表的 DDL 中指定。
    1) DataStream 转化成 Table 时指定
    由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以使用.proctime,定义处理时间字段。注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在schema 定义的末尾定义它。

    // 定义好 DataStream
    DataStream<String> inputStream = env.readTextFile("\\sensor.txt")
    DataStream<SensorReading> dataStream = inputStream
    .map( line -> {
    String[] fields = line.split(",");
    return new SensorReading(fields[0], new Long(fields[1]), new 
    Double(fields[2]));
    } );
    // 将 DataStream 转换为 Table,并指定时间字段
    Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, 
    timestamp, pt.proctime");
    

    2) 定义 Table Schema 时指定
    这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成
    proctime 就可以了。代码如下:

    tableEnv.connect(
    new FileSystem().path("..\\sensor.txt"))
    .withFormat(new Csv())
    .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
    .field("pt", DataTypes.TIMESTAMP(3))
    .proctime() // 指定 pt 字段为处理时间
    ) // 定义表结构
    .createTemporaryTable("inputTable"); // 创建临时表
    

    3) 创建表的 DDL 中指定
    在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。代码如下:

    String sinkDDL = "create table dataTable (" +
    " id varchar(20) not null, " +
    " ts bigint, " +
    " temperature double, " +
    " pt AS PROCTIME() " +
    ") with (" +
    " 'connector.type' = 'filesystem', " +
    " 'connector.path' = '/sensor.txt', " +
    " 'format.type' = 'csv')";
    tableEnv.sqlUpdate(sinkDDL);
    

    事件时间

    事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱
    序事件或者延迟事件时,也可以获得正确的结果。为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
    1) DataStream 转化成 Table 时指定
    在DataStream转换成Table,schema的定义期间,使用.rowtime可以定义事件时间属性。 注意,必须在转换的数据流中分配时间戳和 watermark。 在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:
    ⚫ 作为新字段追加到 schema
    ⚫ 替换现有字段 ,在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。

    DataStream<String> inputStream = env.readTextFile("\\sensor.txt")
    DataStream<SensorReading> dataStream = inputStream
    .map( line -> {
    String[] fields = line.split(",");
    return new SensorReading(fields[0], new Long(fields[1]), new 
    Double(fields[2]));
    } )
    .assignTimestampsAndWatermarks(new 
    BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
    @Override
    public long extractTimestamp(SensorReading element) {
    return element.getTimestamp() * 1000L;
    }
    });
    Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime 
    as ts, temperature");
    

    2) 定义 Table Schema 时指定
    这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。 代码如下:

    tableEnv.connect(
    new FileSystem().path("sensor.txt"))
    .withFormat(new Csv())
    .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .rowtime(
    new Rowtime()
    .timestampsFromField("timestamp") // 从字段中提取时间戳
    .watermarksPeriodicBounded(1000) // watermark 延迟 1 秒
    )
    .field("temperature", DataTypes.DOUBLE())
    ) // 定义表结构
    .createTemporaryTable("inputTable"); // 创建临时表
    

    3) 创建表的 DDL 中指定
    事件时间属性,是使用 CREATE TABLE DDL 中的 WARDMARK 语句定义的。watermark 语句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。代码如下:

    String sinkDDL = "create table dataTable (" +
    " id varchar(20) not null, " +
    " ts bigint, " +
    " temperature double, " +
    " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
    " watermark for rt as rt - interval '1' second" +
    ") with (" +
    " 'connector.type' = 'filesystem', " +
    " 'connector.path' = '/sensor.txt', " +
    " 'format.type' = 'csv')";
    tableEnv.sqlUpdate(sinkDDL);