流处理API分类:<br />
5.1 Environment
5.1.1 getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。是最常用的一种创建执行环境的方式。如果程序是独立调用的(本地开发环境),则此方法返回本地执行环境;如果程序是在集群中被调用的,则此方法返回此集群的执行环境。也就是说,getExecutionEnvironment会根据当前运行的方式决定返回什么样的环境。
// 创建批执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5.1.2 createLocalEnvironment
返回本地执行环境,可在调用时指定默认的并行度。
LocalStreamEnvironment localEnv =
StreamExecutionEnvironment.createLocalEnvironment(1);
5.1.3 createRemoteEnvironment
返回集群执行环境,并将指定jar包提交到远程服务器。需要在调用时指定master的IP和端口号,并指定要在集群中运行的jar包。
StreamExecutionEnvironment remoteEnv =
StreamExecutionEnvironment.createRemoteEnvironment("master-hostname", 6123, "PATH/TO/JAR/ON/MASTER");
5.2 Source
5.2.1 从集合读取数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合读取数据
DataStream<SensorReading> dataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
// 从一系列int值读取数据
DataStreamSource<Integer> intDataStream = env.fromElements(1, 2, 4, 7, 8, 10, 34);
// 打印输出
dataStream.print("temperature");
dataStream.print("int");
// 执行
env.execute();
}
5.2.2 从文件读取数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件读取数据
DataStreamSource<String> dataStream = env.readTextFile("/Users/didi/projects/bigdata/flinktutorial/src/main/resources/sensor.txt");
// 打印输出
dataStream.print();
// 执行
env.execute();
}
5.2.3 从kafka读取数据
kafka是一个外部组件。当试图让Flink与外部组件连接时,需要引入对应的connector包。对于kafka而言,需要引入如下的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
连接代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka配置项
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "consumer-group");
kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.setProperty("auto.offset.reset", "latest");
// 从kafka读取数据
DataStream<String> dataStream = env.addSource(
new FlinkKafkaConsumer011<>(
"sensor-topic",
new SimpleStringSchema(),
kafkaProperties
)
);
// 打印输出
dataStream.print();
// 执行
env.execute();
}
5.2.4 自定义Source
除了以上的Source数据来源,我们还可以自定义Source。需要做的,只是传入一个实现了SourceFunction接口的类对象。具体调用如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从自定义数据源读取数据
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
// 打印输出
dataStream.print();
// 执行
env.execute();
}
// 实现自定义的SourceFunction
public static class MySensorSource implements SourceFunction<SensorReading> {
// 定义一个标志位,用于控制数据的产生
private volatile boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
// 定义一个随机数发生器
Random random = new Random();
// 设置10个传感器的初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (this.running) {
for (String sensorId : sensorTempMap.keySet()) {
// 在当前温度基础上随机波动
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
// 把数据发送给Flink
ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
}
// 控制数据产生频率
Thread.sleep(1000);
}
}
@Override
public void cancel() {
this.running = false;
}
}
5.3 Transform
5.3.1 map
5.3.2 flatMap
5.3.3 Filter
5.3.4 keyBy
在Flink中,若想执行聚合操作,必须先进行分组,而分组是依靠keyBy来实现的。
DataStream -> KeyedStream: 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,内部以hash的形式实现。
5.3.5 滚动聚合算子(Rolling Aggregation)
滚动聚合就是“实时聚合”,每来一个数据,就进行一次聚合,并产生聚合结果。
以下算子可以针对KeyedStream的每一个支流做聚合:
- sum()
- min():只更新聚合结果中的指定字段,其他字段保持初始聚合结果不变。
- max():只更新聚合结果中的指定字段,其他字段保持初始聚合结果不变。
- minBy():更新聚合结果的左右字段为最新值。
- maxBy():更新聚合结果的左右字段为最新值。
5.3.6 Reduce
KeyedStream -> DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
Reduce是一个一般化的滚动聚合算子,可实现各种自定义的聚合需求。DataStream<SensorReading> resultStream = inputStream.map(i -> { System.out.println(i); String[] values = i.split(","); return new SensorReading(values[0], Long.valueOf(values[1]), Double.valueOf(values[2])); }) .keyBy(SensorReading::getId) .reduce(new ReduceFunction<SensorReading>() { /** * @param value1 上次reduce的结果 * @param value2 当前最新的值 * @return 当前reduce后的结果 */ @Override public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { // 取当前最大的温度值,以及最新的时间戳 return new SensorReading( value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()) ); } });5.3.7 Split和Select
Split

DataStream -> SplitStream:根据某些特征把一个DataStream拆分成两个或多个DataStream。
Split本质上是给源Stream中的数据打标,形成一个特殊的流(SplitStream)。然后配合Select算子从SplitStream中筛选出需要的数据。Select

SplitStream -> DataStream:从一个SplitStream中筛选出一个或多个DataStream。示例
需求:传感器数据按照温度高低,以30度为界,拆分成两个流。 ```java SplitStreamsplitStream = inputStream.map(i -> { String[] values = i.split(“,”); return new SensorReading(values[0], Long.valueOf(values[1]), Double.valueOf(values[2])); }).split(new OutputSelector () { /** - @param value 当前数据
- @return 标签集合
*/
@Override
public Iterable
select(SensorReading value) { return value.getTemperature() > 30
} });? Lists.newArrayList("high") : Lists.newArrayList("low");
DataStream
注意:Split和Select已经过时,推荐使用Side outputs:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/side_output/
5.3.8 Connect和CoMap
Connect

DataStream,DataStream -> ConnectedStreams:连接两个保持它们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap/CoFlatMap

ConnectedStream -> DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
5.3.9 Union

DataStream -> DataStream:对两个或者两个以上的相同数据类型的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
5.3.10 重分区
- keyBy():按指定key分区。
- broadcast():给每个下游分区都发送一份数据。
- shuffle():将数据随机发送给下游分区。
- forward():直接转发给本分区下游。
- rebalace():轮询下游的子任务节点,依次发送数据。
- rescale():先将下游子任务节点平分给当前子任务节点(分组),然后各个子任务节点在自己的拥有的分组内轮询下游节点。
- global():将当前子任务所有节点的数据都发给下游子任务第一个节点。
- partitionCustom:用户自定义重分区。

5.4 支持的数据类型
Flink流应用处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确地知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。(Flink自行实现了序列化和反序列机制)
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。
5.4.1 基础数据类型
Flink支持所有的Java和Scala基础数据类型,
- Byte、Short、Integer、Long、Float、Double、Character、Boolean,
- String,Date,BigDecimal,BigInteger
- ……
- 基础类型组成的数组
5.4.2 Java和Scala元组(Tuples)
5.4.3 Scala样例类(case classes)
5.4.4 Java简单对象(POJOs)
要求:
- 必须有无参构造函数。
- 字段必须为public,或者必须有getter/setter方法。
5.4.5 其他(Arrays、Lists、Maps、Enums等)
Flink对Java和Scala中的一些特殊目的的类型也是支持的,比如Java的ArrayList、HashMap、Enum等。
5.5 实现UDF函数——更细粒度的控制流
5.5.1 函数类(Function Classes)
Flink暴露了所有udf函数的接口(或抽象类)。例如MapFunction、FilterFunction、ProcessFunction等等。
5.5.2 Lambda表达式
5.5.3 富函数(Rich Functions)
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期(钩子)方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- ……
Rich Function有一个生命周期的概念。典型的生命周期方法有:
- open():初始化方法。当一个算子(比如map或者filter)被创建后,在被调用之前,open()会被调用。注意,一个算子在被创建后,会持续存在,直至应用结束。生命周期方法是在创建和销毁算子对象时被调用,而非每次执行map等业务逻辑前被调用。
- close():当算子执行完毕后最后会被调用,做一些清理工作。
获取运行时上下文的方法:
- getRuntimeContext():提供了函数的RuntimeContext的一些信息,例如函数执行的并行度、任务的名字,以及state状态。
