1. 流处理API分类:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/388635/1623644514137-d7908b88-5936-47f4-8e15-f53f12647a96.png#clientId=u4b44b354-fb50-4&from=paste&height=95&id=uc228a1dc&margin=%5Bobject%20Object%5D&name=image.png&originHeight=240&originWidth=1510&originalType=binary&ratio=2&size=139632&status=done&style=none&taskId=u155c3ba5-8d62-4fd8-844d-2d25b4154b7&width=600)

5.1 Environment

5.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。是最常用的一种创建执行环境的方式。如果程序是独立调用的(本地开发环境),则此方法返回本地执行环境;如果程序是在集群中被调用的,则此方法返回此集群的执行环境。也就是说,getExecutionEnvironment会根据当前运行的方式决定返回什么样的环境。

// 创建批执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

5.1.2 createLocalEnvironment

返回本地执行环境,可在调用时指定默认的并行度。

LocalStreamEnvironment localEnv = 
    StreamExecutionEnvironment.createLocalEnvironment(1);

5.1.3 createRemoteEnvironment

返回集群执行环境,并将指定jar包提交到远程服务器。需要在调用时指定master的IP和端口号,并指定要在集群中运行的jar包。

StreamExecutionEnvironment remoteEnv = 
    StreamExecutionEnvironment.createRemoteEnvironment("master-hostname", 6123, "PATH/TO/JAR/ON/MASTER");

5.2 Source

5.2.1 从集合读取数据

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 从集合读取数据
    DataStream<SensorReading> dataStream = env.fromCollection(
        Arrays.asList(
            new SensorReading("sensor_1", 1547718199L, 35.8),
            new SensorReading("sensor_6", 1547718201L, 15.4),
            new SensorReading("sensor_7", 1547718202L, 6.7),
            new SensorReading("sensor_10", 1547718205L, 38.1)
        )
    );

    // 从一系列int值读取数据
    DataStreamSource<Integer> intDataStream = env.fromElements(1, 2, 4, 7, 8, 10, 34);

    // 打印输出
    dataStream.print("temperature");
    dataStream.print("int");

    // 执行
    env.execute();
}

5.2.2 从文件读取数据

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 从文件读取数据
    DataStreamSource<String> dataStream = env.readTextFile("/Users/didi/projects/bigdata/flinktutorial/src/main/resources/sensor.txt");

    // 打印输出
    dataStream.print();

    // 执行
    env.execute();
}

5.2.3 从kafka读取数据

kafka是一个外部组件。当试图让Flink与外部组件连接时,需要引入对应的connector包。对于kafka而言,需要引入如下的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

连接代码:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // kafka配置项
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("group.id", "consumer-group");
    kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProperties.setProperty("auto.offset.reset", "latest");

    // 从kafka读取数据
    DataStream<String> dataStream = env.addSource(
        new FlinkKafkaConsumer011<>(
            "sensor-topic", 
            new SimpleStringSchema(), 
            kafkaProperties
        )
    );

    // 打印输出
    dataStream.print();

    // 执行
    env.execute();
}

5.2.4 自定义Source

除了以上的Source数据来源,我们还可以自定义Source。需要做的,只是传入一个实现了SourceFunction接口的类对象。具体调用如下:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 从自定义数据源读取数据
    DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());

    // 打印输出
    dataStream.print();

    // 执行
    env.execute();
}

// 实现自定义的SourceFunction
public static class MySensorSource implements SourceFunction<SensorReading> {

    // 定义一个标志位,用于控制数据的产生
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<SensorReading> ctx) throws Exception {
        // 定义一个随机数发生器
        Random random = new Random();

        // 设置10个传感器的初始温度
        HashMap<String, Double> sensorTempMap = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
        }

        while (this.running) {
            for (String sensorId : sensorTempMap.keySet()) {
                // 在当前温度基础上随机波动
                Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                sensorTempMap.put(sensorId, newTemp);
                // 把数据发送给Flink
                ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
            }
            // 控制数据产生频率
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

5.3 Transform

转换算子。

5.3.1 map

5.3.2 flatMap

5.3.3 Filter

5.3.4 keyBy

在Flink中,若想执行聚合操作,必须先进行分组,而分组是依靠keyBy来实现的。
DataStream -> KeyedStream: 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,内部以hash的形式实现。

5.3.5 滚动聚合算子(Rolling Aggregation)

滚动聚合就是“实时聚合”,每来一个数据,就进行一次聚合,并产生聚合结果。
以下算子可以针对KeyedStream的每一个支流做聚合:

  • sum()
  • min():只更新聚合结果中的指定字段,其他字段保持初始聚合结果不变。
  • max():只更新聚合结果中的指定字段,其他字段保持初始聚合结果不变。
  • minBy():更新聚合结果的左右字段为最新值。
  • maxBy():更新聚合结果的左右字段为最新值。

    5.3.6 Reduce

    KeyedStream -> DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
    Reduce是一个一般化的滚动聚合算子,可实现各种自定义的聚合需求。
    DataStream<SensorReading> resultStream = inputStream.map(i -> {
      System.out.println(i);
      String[] values = i.split(",");
      return new SensorReading(values[0], Long.valueOf(values[1]), Double.valueOf(values[2]));
    })
      .keyBy(SensorReading::getId)
      .reduce(new ReduceFunction<SensorReading>() {
          /**
          * @param value1 上次reduce的结果
          * @param value2 当前最新的值
          * @return 当前reduce后的结果
          */
          @Override
          public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
              // 取当前最大的温度值,以及最新的时间戳
              return new SensorReading(
                  value1.getId(),
                  value2.getTimestamp(),
                  Math.max(value1.getTemperature(), value2.getTemperature())
              );
          }
      });
    

    5.3.7 Split和Select

    Split

    image.png
    DataStream -> SplitStream:根据某些特征把一个DataStream拆分成两个或多个DataStream。
    Split本质上是给源Stream中的数据打标,形成一个特殊的流(SplitStream)。然后配合Select算子从SplitStream中筛选出需要的数据。

    Select

    image.png
    SplitStream -> DataStream:从一个SplitStream中筛选出一个或多个DataStream。

    示例

    需求:传感器数据按照温度高低,以30度为界,拆分成两个流。 ```java SplitStream splitStream = inputStream.map(i -> { String[] values = i.split(“,”); return new SensorReading(values[0], Long.valueOf(values[1]), Double.valueOf(values[2])); }).split(new OutputSelector() { /**
    • @param value 当前数据
    • @return 标签集合 */ @Override public Iterable select(SensorReading value) { return value.getTemperature() > 30
        ? Lists.newArrayList("high")
        : Lists.newArrayList("low");
      
      } });

DataStream highTempStream = splitStream.select(“high”); DataStream lowTempStream = splitStream.select(“low”); DataStream allTempStream = splitStream.select(“high”, “low”); ```

注意:Split和Select已经过时,推荐使用Side outputs:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/side_output/

5.3.8 Connect和CoMap

Connect

image.png
DataStream,DataStream -> ConnectedStreams:连接两个保持它们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap/CoFlatMap

image.png
ConnectedStream -> DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

5.3.9 Union

image.png
DataStream -> DataStream:对两个或者两个以上的相同数据类型的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。

5.3.10 重分区

  • keyBy():按指定key分区。
  • broadcast():给每个下游分区都发送一份数据。
  • shuffle():将数据随机发送给下游分区。
  • forward():直接转发给本分区下游。
  • rebalace():轮询下游的子任务节点,依次发送数据。
  • rescale():先将下游子任务节点平分给当前子任务节点(分组),然后各个子任务节点在自己的拥有的分组内轮询下游节点。
  • global():将当前子任务所有节点的数据都发给下游子任务第一个节点。
  • partitionCustom:用户自定义重分区。

flinkdatastream.png


5.4 支持的数据类型

Flink流应用处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确地知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。(Flink自行实现了序列化和反序列机制
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。

5.4.1 基础数据类型

Flink支持所有的Java和Scala基础数据类型,

  • Byte、Short、Integer、Long、Float、Double、Character、Boolean,
  • String,Date,BigDecimal,BigInteger
  • ……
  • 基础类型组成的数组

    5.4.2 Java和Scala元组(Tuples)

5.4.3 Scala样例类(case classes)

5.4.4 Java简单对象(POJOs)

要求:

  • 必须有无参构造函数。
  • 字段必须为public,或者必须有getter/setter方法。

    5.4.5 其他(Arrays、Lists、Maps、Enums等)

    Flink对Java和Scala中的一些特殊目的的类型也是支持的,比如Java的ArrayList、HashMap、Enum等。

    参考:Flink数据类型和序列化


5.5 实现UDF函数——更细粒度的控制流

5.5.1 函数类(Function Classes)

Flink暴露了所有udf函数的接口(或抽象类)。例如MapFunction、FilterFunction、ProcessFunction等等。

5.5.2 Lambda表达式

5.5.3 富函数(Rich Functions)

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期(钩子)方法,所以可以实现更复杂的功能。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • ……

Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open():初始化方法。当一个算子(比如map或者filter)被创建后,在被调用之前,open()会被调用。注意,一个算子在被创建后,会持续存在,直至应用结束。生命周期方法是在创建和销毁算子对象时被调用,而非每次执行map等业务逻辑前被调用
  • close():当算子执行完毕后最后会被调用,做一些清理工作。

获取运行时上下文的方法:

  • getRuntimeContext():提供了函数的RuntimeContext的一些信息,例如函数执行的并行度、任务的名字,以及state状态。

5.6 Sink

Sink和Source非常相似。

5.6.1 Kafka

5.6.2 Redis

5.6.3 Elasticsearch

5.6.4 JDBC