Flink DataStream API 编程指南

Flink中的DataStream程序是对数据流进行转换(例如,过滤、更新状态、定义窗口、聚合)的常用方式。数据流起于各种sources(例如,消息队列,socket流,文件)。通过sinks返回结果,例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以运行在各种上下文环境中,独立或嵌入其他程序中。 执行过程可能发生在本地JVM或在由许多机器组成的集群上。

关于Flink API的基本概念介绍请参阅基本概念

为了创建你的Flink DataStream程序,我们鼓励你从解构Flink程序 开始,并逐渐添加你自己的transformations。本节其余部分作为附加操作和高级功能的参考。

示例程序

以下是基于流式窗口进行word count的一个完整可运行的程序示例,它从网络socket中以5秒的窗口统计单词个数。你可以复制并粘贴代码用以本地运行。

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.windowing.time.Time;
  6. import org.apache.flink.util.Collector;
  7. public class WindowWordCount {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. DataStream<Tuple2<String, Integer>> dataStream = env
  11. .socketTextStream("localhost", 9999)
  12. .flatMap(new Splitter())
  13. .keyBy(0)
  14. .timeWindow(Time.seconds(5))
  15. .sum(1);
  16. dataStream.print();
  17. env.execute("Window WordCount");
  18. }
  19. public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  20. @Override
  21. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
  22. for (String word: sentence.split(" ")) {
  23. out.collect(new Tuple2<String, Integer>(word, 1));
  24. }
  25. }
  26. }
  27. }
  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.windowing.time.Time
  3. object WindowWordCount {
  4. def main(args: Array[String]) {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. val text = env.socketTextStream("localhost", 9999)
  7. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  8. .map { (_, 1) }
  9. .keyBy(0)
  10. .timeWindow(Time.seconds(5))
  11. .sum(1)
  12. counts.print
  13. env.execute("Window Stream WordCount")
  14. }
  15. }

要运行示例程序,首先在终端启动netcat作为输入流:

  1. nc -lk 9999

然后输入一些单词,回车换行输入新一行的单词。这些输入将作为示例程序的输入。如果要使得某个单词的计数大于1,请在5秒钟内重复输入相同的字词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大 ☺)。

DataStream Transformations

数据的Transformations可以将一个或多个DataStream转换为一个新的DataStream。程序可以将多种Transformations组合成复杂的拓扑结构。

本节对所有可用Transformations进行详细说明。

Transformation 描述
Map
DataStream → DataStream

读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function:

java DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
FlatMap
DataStream → DataStream

读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function:

java dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
Filter
DataStream → DataStream

对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter:

java dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
KeyBy
DataStream → KeyedStream

逻辑上将流分区为不相交的分区,每个分区包含相同key的元素。在内部通过hash分区来实现。关于如何指定分区的keys请参阅keys。该transformation返回一个KeyedDataStream。

java dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple

注意 以下类型不能作为key:

  1. POJO类型,并且依赖于Object.hashCode()的实现,但是未覆写hashCode()
  2. 任意类型的数组

Reduce
KeyedStream → DataStream

在一个KeyedStream上不断进行reduce操作。将当前元素与上一个reduce后的值进行合并,再返回新合并的值。

一个构造局部求和流的reduce function:

java keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });

Fold
KeyedStream → DataStream

在一个KeyedStream上基于初始值不断进行变换操作,将当前值与上一个变换后的值进行变换,再返回新变换的值。

>在序列(1,2,3,4,5)上应用如下的fold function,返回的序列依次是“start-1”,“start-1-2”,“start-1-2-3”, …:

java DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });

Aggregations
KeyedStream → DataStream

在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。

java keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); {% endhighlight %} </td> </tr> <tr> <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td> <td> <p>Windows可定义在已分区的KeyedStreams上。Windows会在每个key对应的数据上根据一些特征(例如,在最近5秒内到达的数据)进行分组。 有关Windows的完整说明请参阅<a href="windows.html">windows</a>。 {% highlight java %} dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

Windows可定义在普通DataStream上。Windows根据一些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。有关Windows的完整说明请参阅windows

警告:在多数情况下,这是非并行的的transformation。所有记录将被聚集到运行windowAll操作的一个任务中。

java dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

把窗口作为整体,并在此整体上应用通用函数。以下是手动对窗口全体元素求和的函数。

注意:如果你正在使用windowAll transformation,则需要替换为AllWindowFunction。

java windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
Window Reduce
WindowedStream → DataStream

在窗口上应用一个通用的reduce函数并返回reduced后的值。

java windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } });
Window Fold
WindowedStream → DataStream

在窗口上应用一个通用的fold函数并返回fold后的值。在序列(1,2,3,4,5)上应用示例函数,将会得到字符串“start-1-2-3-4-5”中:

java windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; } });
Aggregations on windows
WindowedStream → DataStream

聚合窗口的内容。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。

java windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key");
Union
DataStream → DataStream

联合(Union)两个或多个数据流,创建一个包含来自所有流的所有元素的新的数据流。注意:如果DataStream和自身联合,那么在结果流中每个元素你会拿到两份。

java dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream

在给定的key和公共窗口上连接(Join)两个DataStream。

java dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});
Window CoGroup
DataStream,DataStream → DataStream

在给定的key和公共窗口上CoGroup两个DataStream。

java dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStreams

“串联”(Connect)两个DataStream并保留各自类型。串联允许两个流之间共享状态。

java DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap
ConnectedStreams → DataStream

在一个ConnectedStreams上做类似map和flatMap的操作。

java connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });
Split
DataStream → SplitStream

根据一些标准将流分成两个或更多个流。 java SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });

Select
SplitStream → DataStream

在一个SplitStream上选择一个或多个流。 java SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");

Iterate
DataStream → IterativeStream → DataStream

通过将一个operator的输出重定向到某个先前的operator,在流中创建“反馈”循环。这对于需要不断更新模型的算法特别有用。 以下代码以流开始,并持续应用迭代体。大于0的元素将回送到反馈通道,其余元素发往下游。相关完整描述请参阅iterations。 ```java IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something*/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); </p> </td> </tr> <tr> <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td> <td> <p> 从记录中提取时间戳,以便在窗口中使用事件时间语义。请参阅<a href="{{ site.baseurl }}/dev/event_time.html">Event Time</a>。java stream.assignTimestamps (new TimeStampExtractor() {…}); ```

Transformation Description
Map
DataStream → DataStream

读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function:

scala dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream

读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function:

scala dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream

对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter:

scala dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream

逻辑上将流分区为不相交的分区,每个分区包含相同key的元素。在内部通过hash分区来实现。关于如何指定分区的keys请参阅keys。该transformation返回一个KeyedDataStream。

scala dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

在一个KeyedStream上不断进行reduce操作。将当前元素与上一个reduce后的值进行合并,再返回新合并的值。

一个构造局部求和流的reduce function:

scala keyedStream.reduce { _ + _ }

Fold
KeyedStream → DataStream

在一个KeyedStream上基于初始值不断进行变换操作,将当前值与上一个变换后的值进行变换,再返回新变换的值。

在序列(1,2,3,4,5)上应用如下的fold function,返回的序列依次是“start-1”,“start-1-2”,“start-1-2-3”, …:

scala val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations
KeyedStream → DataStream

在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。

scala keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream

Windows可定义在已分区的KeyedStreams上。Windows会在每个key对应的数据上根据一些特征(例如,在最近5秒内到达的数据)进行分组。有关Windows的完整说明请参阅windowsscala dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

Windows可定义在普通DataStream上。Windows根据一些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。有关Windows的完整说明请参阅windows

警告:在多数情况下,这是非并行的transformation。所有记录将被聚集到运行windowAll操作的一个任务中。

scala dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

把窗口作为整体,并在此整体上应用通用函数。以下是手动对窗口全体元素求和的函数。

注意:如果你正在使用windowAll transformation,则需要替换为AllWindowFunction

scala windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream

在窗口上应用一个通用的reduce函数并返回reduced后的值。

scala windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream

在窗口上应用一个通用的fold函数并返回fold后的值。在序列(1,2,3,4,5)上应用示例函数,将会得到字符串“start-1-2-3-4-5”中:

scala val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream → DataStream

聚合窗口的内容。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。

scala windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key")
Union
DataStream → DataStream

联合(Union)两个或多个数据流,创建一个包含来自所有流的所有元素的新的数据流。注意:如果DataStream和自身联合,那么在结果流中每个元素你会拿到两份。

scala dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream

在给定的key和公共窗口上连接(Join)两个DataStream。

scala dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... }
Window CoGroup
DataStream,DataStream → DataStream

在给定的key和公共窗口上CoGroup两个DataStream。

scala dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {}
Connect
DataStream,DataStream → ConnectedStreams

“串联”(Connect)两个DataStream并保留各自类型。串联允许两个流之间共享状态。

scala someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream

在一个ConnectedStreams上做类似map和flatMap的操作。

scala connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
Split
DataStream → SplitStream

根据一些标准将流分成两个或更多个流。 scala val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )

Select
SplitStream → DataStream

在一个SplitStream上选择一个或多个流。 scala val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")

Iterate
DataStream → IterativeStream → DataStream

通过将一个operator的输出重定向到某个先前的operator,在流中创建“反馈”循环。这对于需要不断更新模型的算法特别有用。以下代码以流开始,并持续应用迭代体。大于0的元素将回送到反馈通道,其余元素发往下游。相关完整描述请参阅iterations。 ```scala initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something*/} (iterationBody.filter( > 0), iterationBody.filter( <= 0)) } } </p> </td> </tr> <tr> <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td> <td> <p> 从记录中提取时间戳,以便在窗口中使用事件时间语义。请参阅<a href="{{ site.baseurl }}/apis/streaming/event_time.html">Event Time</a>。scala stream.assignTimestamps { timestampExtractor } ```

使用如下的匿名模式匹配从元组、case类和集合中抽取信息:

  1. val data: DataStream[(Int, String, Double)] = // [...]
  2. data.map {
  3. case (id, name, temperature) => // [...]
  4. }

未被API原生支持。要使用该特性,请用Scala API extension

以下transformations可用于元组类型的DataStream:

Transformation Description
Project
DataStream → DataStream

从元组中选择一部分字段子集 java DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理分区

在一个transformation之后,Flink也提供了底层API以允许用户在必要时精确控制流分区,参见如下的函数。

Transformation Description
Custom partitioning
DataStream → DataStream

使用一个用户自定义Partitioner确定每个元素对应的目标task。 java dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);

Random partitioning
DataStream → DataStream

按照均匀分布以随机的方式对元素进行分区。 java dataStream.shuffle();

Rebalancing (Round-robin partitioning)
DataStream → DataStream

以round-robin方式对元素进行分区,使得每个分区负载均衡。在数据倾斜的情况下进行性能优化有用。 java dataStream.rebalance();

Rescaling
DataStream → DataStream

以round-robin方式对元素分区到下游operations。如果你想从source的每个并行实例分散到若干个mappers以负载均衡,但是你不期望rebalacne()那样进行全局负载均衡,这将会有用。这将仅需要本地数据传输,而不是通过网络传输数据,具体取决于其他配置值,例如TaskManager的插槽数。

上游operation所发送的元素被分区到下游operation的哪些子集,取决于上游和下游操作的并发度。例如,如果上游operation并发度为2,而下游operation并发度为6,则其中1个上游operation会将元素分发到3个下游operation,另1个上游operation会将元素分发到另外3个下游operation。相反地,如果上游operation并发度为6,而下游operation并发度为2,则其中3个上游operation会将元素分发到1个下游operation,另1个上游operation会将元素分发到另外1个下游operation。

在上下游operation的并行度不是彼此的倍数的情况下,下游operation对应的上游的operation输入数量不同。

下图可视化了上面例子中说明的对应关系:

Checkpoint barriers in data streams

java dataStream.rescale();

Broadcasting
DataStream → DataStream

广播元素到每个分区。 java dataStream.broadcast();

Transformation Description
Custom partitioning
DataStream → DataStream

使用一个用户自定义Partitioner确定每个元素对应的目标task。 scala dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)

Random partitioning
DataStream → DataStream

按照均匀分布以随机的方式对元素进行分区。 scala dataStream.shuffle()

Rebalancing (Round-robin partitioning)
DataStream → DataStream

以round-robin方式对元素进行分区,使得每个分区负载均衡。在数据倾斜的情况下进行性能优化有用。 scala dataStream.rebalance()

Rescaling
DataStream → DataStream

以round-robin方式对元素分区到下游operations。如果你想从source的每个并行实例分散到若干个mappers以负载均衡,但是你不期望rebalacne()那样进行全局负载均衡,这将会有用。这将仅需要本地数据传输,而不是通过网络传输数据,具体取决于其他配置值,例如TaskManager的插槽数。

上游operation所发送的元素被分区到下游operation的哪些子集,取决于上游和下游操作的并发度。例如,如果上游operation并发度为2,而下游operation并发度为6,则其中1个上游operation会将元素分发到3个下游operation,另1个上游operation会将元素分发到另外3个下游operation。相反地,如果上游operation并发度为6,而下游operation并发度为2,则其中3个上游operation会将元素分发到1个下游operation,另1个上游operation会将元素分发到另外1个下游operation。

在上下游operation的并行度不是彼此的倍数的情况下,下游operation对应的上游的operation输入数量不同。

下图可视化了上面例子中说明的对应关系:

Checkpoint barriers in data streams

java dataStream.rescale()

Broadcasting
DataStream → DataStream

广播元素到每个分区。 scala dataStream.broadcast()

任务链接(chaining) 和资源组

链接(chaining)两个依次的transformations意味着将它们运行在同一个线程中以获得更好的性能。Flink默认情况下尽可能进行该链接操作(比如两个依次的map transformations),同时Flink根据需要提供API对链接进行细粒度控制:

如果不想在整个Job上进行默认的链接优化,可以设置StreamExecutionEnvironment.disableOperatorChaining()。下面的函数可用于更细粒度的控制链接。注意这些函数只能用在一个DataStream transformation之后,因为它们是指向先前的transformation。例如,你可以someStream.map(...).startNewChain(), 但是你不能someStream.startNewChain().

Flink中的一个资源组是一个slot,详情请参阅slots. 必要时你可以手动隔离不同slots中的operators。


Transformation Description
Start new chain

从这个operator开始新建一个新的chain。这两个mappers将被链接起来,而filter不会和第一个mapper链接。 java someStream.filter(...).map(...).startNewChain().map(...);

Disable chaining

不与这个map operator链接 java someStream.map(...).disableChaining();

Set slot sharing group

设置operation的slot共享组。Flink会把相同slot共享组的operation放在同一个slot中,而把没有slot共享组的operation放到其他slot中。这可以用来隔离slot。如果所有输入operation都在相同的slot共享组中,则slot共享组将继承自输入operation。默认slot共享组是“default”,可以通过调用slotSharingGroup(“default”)将operation显式的放入该组。 java someStream.filter(...).slotSharingGroup("name");


Transformation Description
Start new chain

从这个operator开始新建一个新的chain。这两个mappers将被链接起来,而filter不会和第一个mapper链接。 scala someStream.filter(...).map(...).startNewChain().map(...)

Disable chaining

不与这个map operator链接 scala someStream.map(...).disableChaining()

Set slot sharing group

设置operation的slot共享组。Flink会把相同slot共享组的operation放在同一个slot中,而把没有slot共享组的operation放到其他slot中。这可以用来隔离slot。如果所有输入operation都在相同的slot共享组中,则slot共享组将继承自输入operation。默认slot共享组是“default”,可以通过调用slotSharingGroup(“default”)将operation显式的放入该组。 {% highlight java %} someStream.filter(…).slotSharingGroup(“name”) {% endhighlight %}

数据Sources

Sources是你的程序读取输入的地方。你可以通过StreamExecutionEnvironment.addSource(sourceFunction)将Source添加到你的程序中。Flink提供了若干已经实现好了的source functions,当然你也可以通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source,

StreamExecutionEnvironment中可以使用以下几个已实现的stream sources:

基于文件:

  • readTextFile(path) - 读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。

  • readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的fileInputFormat和读取路径读取文件。根据提供的watchType,这个source可以定期(每隔interval毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过pathFilter进一步排除掉需要处理的文件。

    实现:

    在具体实现上,Flink把文件读取过程分为两个子任务,即目录监控数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据watchType定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游reader。reader负责读取数据。每个切分片只能由一个reader读取,但一个reader可以逐个读取多个切分片。

    重要注意:

    1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

    2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,则source仅扫描路径一次然后退出,而不等待reader完成文件内容的读取。当然reader会继续阅读,直到读取所有的文件内容。关闭source后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。

基于 Socket:

  • socketTextStream - 从socket读取。元素可以用分隔符切分。

基于集合:

  • fromCollection(Collection) - 从Java的Java.util.Collection创建数据流。集合中的所有元素类型必须相同。

  • fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class指定了该迭代器返回元素的类型。

  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

  • fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class指定了该迭代器返回元素的类型。

  • generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

自定义:

  • addSource - 添加一个新的source function。例如,你可以addSource(new FlinkKafkaConsumer08<>(...))以从Apache Kafka读取数据。详情参阅 connectors


Sources是你的程序读取输入的地方。你可以通过StreamExecutionEnvironment.addSource(sourceFunction)将Source添加到你的程序中。Flink提供了若干已经实现好了的source functions,当然你也可以通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source, StreamExecutionEnvironment中可以使用以下几个已实现的stream sources: 基于文件: - readTextFile(path) - 读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。 - readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。 - readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是上面两个方法内部调用的方法。它根据给定的fileInputFormat和读取路径读取文件。根据提供的watchType,这个source可以定期(每隔interval毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过pathFilter进一步排除掉需要处理的文件。 实现: 在具体实现上,Flink把文件读取过程分为两个子任务,即目录监控数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据watchType定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游reader。reader负责读取数据。每个切分片只能由一个reader读取,但一个reader可以逐个读取多个切分片。 重要注意: 1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。 2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,则source仅扫描路径一次然后退出,而不等待reader完成文件内容的读取。当然reader会继续阅读,直到读取所有的文件内容。关闭source后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。 基于 Socket: - socketTextStream - 从socket读取。元素可以用分隔符切分。 Collection-based: - fromCollection(Seq) - 从Java的Java.util.Collection创建数据流。集合中的所有元素类型必须相同。 - fromCollection(Iterator) - 从一个迭代器中创建数据流。Class指定了该迭代器返回元素的类型。 - fromElements(elements: _*) - 从给定的对象序列中创建数据流。所有对象类型必须相同。 - fromParallelCollection(SplittableIterator) - 从一个迭代器中创建并行数据流。Class指定了该迭代器返回元素的类型。 - generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。 自定义: - addSource - 添加一个新的source function。例如,你可以addSource(new FlinkKafkaConsumer08<>(...))以从Apache Kafka读取数据。详情参阅connectors

数据Sinks

数据sinks消费DataStream并将其发往文件、socket、外部系统或进行打印。Flink自带多种内置的输出格式,这些都被封装在对DataStream的操作背后:

  • writeAsText() / TextOutputFormat - 将元素以字符串形式写入。字符串 通过调用每个元素的toString()方法获得。

  • writeAsCsv(...) / CsvOutputFormat - 将元组写入逗号分隔的csv文件。行和字段 分隔符均可配置。每个字段的值来自对象的toString()方法。

  • print() / printToErr() - 打印每个元素的toString()值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同print的结果。如果并行度大于1,则task id也会添加到输出前缀上。

  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。

  • writeToSocket - 根据SerializationSchema把元素写到socket

  • addSink - 调用自定义sink function。Flink自带了很多连接其他系统的连接器(connectors)(如 Apache Kafka),这些连接器都实现了sink function。


Data sinks消费DataStream并将其发往文件、socket、外部系统或进行打印。Flink自带多种内置的输出格式,这些都被封装在对DataStream的操作背后: - writeAsText() / TextOutputFormat - 将元素以字符串形式写入。字符串 通过调用每个元素的toString()方法获得。 - writeAsCsv(...) / CsvOutputFormat - 将元组写入逗号分隔的csv文件。行和字段 分隔符均可配置。每个字段的值来自对象的toString()方法。 - print() / printToErr() - 打印每个元素的toString()值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同print的结果。如果并行度大于1,则task id也会添加到输出前缀上。 - writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。 - writeToSocket - 根据SerializationSchema把元素写到socket - addSink - 调用自定义sink function。Flink自带了很多连接其他系统的连接器(connectors)(如 Apache Kafka),这些连接器都实现了sink function。

请注意,DataStream上的write*()方法主要用于调试目的。它们没有参与Flink的检查点机制,这意味着这些function通常都有 at-least-once语义。数据刷新到目标系统取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都会立即在目标系统中可见。此外,在失败的情况下,这些记录可能会丢失。

为了可靠,在把流写到文件系统时,使用flink-connector-filesystem来实现exactly-once。此外,通过.addSink(...)方法自定义的实现可以参与Flink的检查点机制以实现exactly-once语义。

迭代(Iterations)

迭代流程序实现一个step function并将其嵌入到IterativeStream中。由于这样的DataStream程序可能永远不会结束,所以没有最大迭代次数。事实上你需要指定哪一部分的流被反馈到迭代过程,哪个部分通过splitfilter transformation向下游转发。在这里,我们展示一个使用过滤器的例子。首先,我们定义一个IterativeStream

  1. IterativeStream<Integer> iteration = input.iterate();

然后,我们使用一系列transformations来指定在循环内执行的逻辑(这里示意一个简单的map transformation)

  1. DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

要关闭迭代并定义迭代尾部,需要调用IterativeStreamcloseWith(feedbackStream)方法。传给closeWith function的DataStream将被反馈给迭代的头部。一种常见的模式是使用filter来分离流中需要反馈的部分和需要继续发往下游的部分。这些filter可以定义“终止”逻辑,以控制元素是流向下游而不是反馈迭代。

  1. iteration.closeWith(iterationBody.filter(/* one part of the stream */));
  2. DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

默认情况下,反馈流的分区将自动设置为与迭代的头部的输入分区相同。用户可以在closeWith方法中设置一个可选的boolean标志来覆盖默认行为。

例如,如下程序从一系列整数连续减1,直到它们达到零:

  1. DataStream<Long> someIntegers = env.generateSequence(0, 1000);
  2. IterativeStream<Long> iteration = someIntegers.iterate();
  3. DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  4. @Override
  5. public Long map(Long value) throws Exception {
  6. return value - 1 ;
  7. }
  8. });
  9. DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  10. @Override
  11. public boolean filter(Long value) throws Exception {
  12. return (value > 0);
  13. }
  14. });
  15. iteration.closeWith(stillGreaterThanZero);
  16. DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  17. @Override
  18. public boolean filter(Long value) throws Exception {
  19. return (value <= 0);
  20. }
  21. });

迭代streaming程序实现一个step function并将其嵌入到IterativeStream中。由于这样的DataStream程序可能永远不会结束,所以没有最大迭代次数。事实上你需要指定哪一部分的流被反馈到迭代过程,哪个部分通过splitfilter transformation向下游转发。在这里,我们展示一个迭代的例子,其中主体(计算部分被反复执行)是简单的map transformation,迭代反馈的元素和继续发往下游的元素通过filters进行区分。

  1. val iteratedStream = someDataStream.iterate(
  2. iteration => {
  3. val iterationBody = iteration.map(/* this is executed many times */)
  4. (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
  5. })

默认情况下,反馈流的分区将自动设置为与迭代的头部的输入分区相同。用户可以在closeWith方法中设置一个可选的boolean标志来覆盖默认行为。

例如,如下程序从一系列整数连续减1,直到它们达到零:

  1. val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
  2. val iteratedStream = someIntegers.iterate(
  3. iteration => {
  4. val minusOne = iteration.map( v => v - 1)
  5. val stillGreaterThanZero = minusOne.filter (_ > 0)
  6. val lessThanZero = minusOne.filter(_ <= 0)
  7. (stillGreaterThanZero, lessThanZero)
  8. }
  9. )

执行参数

StreamExecutionEnvironment包含ExecutionConfig,它允许为作业运行时进行配置。

更多配置参数请参阅execution configuration。下面的参数是DataStream API特有的:

  • enableTimestamps() / disableTimestamps(): 如果启用,则从source发出的每一条记录都会附加一个时间戳。 areTimestampsEnabled() 返回当前是否启用该值。

  • setAutoWatermarkInterval(long milliseconds): 设置自动发射watermark的间隔。你可以通过long getAutoWatermarkInterval()获取当前的发射间隔。

容错

State & Checkpointing 描述了如何开启和配置Flink的checkpointing机制。

延迟控制

默认情况下,元素不会逐个传输(这将导致不必要的网络流量),而是被缓冲的。缓冲(实际是在机器之间传输)的大小可以在Flink配置文件中设置。虽然这种方法对于优化吞吐量有好处,但是当输入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,你可以在execution environment(或单个operator)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填满的最大等待时间。如果超过该最大等待时间,即使缓冲区未满,也会被自动发送出去。该最大等待时间默认值为100 ms。

Usage:

  1. LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. env.setBufferTimeout(timeoutMillis);
  3. env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  1. LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
  2. env.setBufferTimeout(timeoutMillis)
  3. env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,可以设置setBufferTimeout(-1),这样就没有了超时机制,缓冲区只有在满时才会发送出去。为了最小化延迟,可以把超时设置为接近0的值(例如5或10 ms)。应避免将该超时设置为0,因为这样可能导致性能严重下降。

调试

在分布式集群中运行Streaming程序之前,最好确保实现的算法可以正常工作。因此,实施数据分析程序通常是一个渐进的过程:检查结果,调试和改进。

Flink提供了诸多特性来大幅简化数据分析程序的开发:你可以在IDE中进行本地调试,注入测试数据,收集结果数据。本节给出一些如何简化Flink程序开发的指导。

本地执行环境

LocalStreamEnvironment会在其所在的进程中启动一个Flink引擎. 如果你在IDE中启动LocalEnvironment,你可以在你的代码中设置断点,轻松调试你的程序。

一个LocalEnvironment的创建和使用示例如下:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. DataStream<String> lines = env.addSource(/* some source */);
  3. // build your program
  4. env.execute();
  1. val env = StreamExecutionEnvironment.createLocalEnvironment()
  2. val lines = env.addSource(/* some source */)
  3. // build your program
  4. env.execute()

基于集合的数据Sources

Flink提供了基于Java集合实现的特殊数据sources用于测试。一旦程序通过测试,它的sources和sinks可以方便的替换为从外部系统读写的sources和sinks。

基于集合的数据Sources可以像这样使用:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. // Create a DataStream from a list of elements
  3. DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
  4. // Create a DataStream from any Java collection
  5. List<Tuple2<String, Integer>> data = ...
  6. DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
  7. // Create a DataStream from an Iterator
  8. Iterator<Long> longIt = ...
  9. DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
  1. val env = StreamExecutionEnvironment.createLocalEnvironment()
  2. // Create a DataStream from a list of elements
  3. val myInts = env.fromElements(1, 2, 3, 4, 5)
  4. // Create a DataStream from any Collection
  5. val data: Seq[(String, Int)] = ...
  6. val myTuples = env.fromCollection(data)
  7. // Create a DataStream from an Iterator
  8. val longIt: Iterator[Long] = ...
  9. val myLongs = env.fromCollection(longIt)

注意:目前,集合数据source要求数据类型和迭代器实现Serializable。并行度 = 1)。

迭代的数据Sink

Flink还提供了一个sink来收集DataStream的测试和调试结果。它可以这样使用:

  1. import org.apache.flink.contrib.streaming.DataStreamUtils
  2. DataStream<Tuple2<String, Integer>> myResult = ...
  3. Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
  1. import org.apache.flink.contrib.streaming.DataStreamUtils
  2. import scala.collection.JavaConverters.asScalaIteratorConverter
  3. val myResult: DataStream[(String, Int)] = ...
  4. val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala