FLink数据流API编程指南

Flink中的Datastream程序是实现数据流转换的常规程序(例如过滤、更新状态、定义窗口、聚合)。数据流最初是从各种源(例如消息队列、套接字流、文件)创建的。结果通过接收器返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中进行,也可以在许多计算机集群上执行。

请参阅基本concepts]以了解FLinkAPI的基本概念。

为了创建自己的FLink数据流程序,我们鼓励您从Flinks程序的解剖结构开始,并逐渐添加您自己的流转换。其余章节用作附加操作和高级功能的参考。

示例程序

下面的程序是一个完整的,工作的例子,流窗口字计数应用程序,计算来自一个网络套接字在5秒窗口中的单词。您可以复制和粘贴代码以在本地运行。

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.windowing.time.Time;
  6. import org.apache.flink.util.Collector;
  7. public class WindowWordCount {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. DataStream<Tuple2<String, Integer>> dataStream = env
  11. .socketTextStream("localhost", 9999)
  12. .flatMap(new Splitter())
  13. .keyBy(0)
  14. .timeWindow(Time.seconds(5))
  15. .sum(1);
  16. dataStream.print();
  17. env.execute("Window WordCount");
  18. }
  19. public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  20. @Override
  21. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
  22. for (String word: sentence.split(" ")) {
  23. out.collect(new Tuple2<String, Integer>(word, 1));
  24. }
  25. }
  26. }
  27. }
  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.windowing.time.Time
  3. object WindowWordCount {
  4. def main(args: Array[String]) {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. val text = env.socketTextStream("localhost", 9999)
  7. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  8. .map { (_, 1) }
  9. .keyBy(0)
  10. .timeWindow(Time.seconds(5))
  11. .sum(1)
  12. counts.print()
  13. env.execute("Window Stream WordCount")
  14. }
  15. }

要运行示例程序,首先从终端启动带有netcat的输入流:

  1. nc -lk 9999

只需键入一些单词,点击返回一个新的单词。这些将是单词计数程序的输入。如果您想查看大于1的计数,请在5秒内一次又一次地键入相同的单词(如果无法键入该快速窗口,则从5秒增加窗口大小)。

数据来源

来源是您的程序从其中读取其输入的地方。您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序中。flink附带许多预实现的源函数,但您可以通过实现非平行源的 SourceFunction ,或通过实现ParallelSourceFunction接口或扩展并行源的RichParallelSourceFunction 来编写自己的自定义源。

可以从StreamExecutionEnvironment访问多个预定义的流来源:

基于文件:

  • readTextFile(path)—读取文本文件,即尊重TextInputFormat 规范的文件,逐行读取,并将它们作为String返回。

  • readFile(fileInputFormat, path)-按指定的文件输入格式指定文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)-这是两个以前的文件内部调用的方法。它根据给定的“fileInputFormat”读取“路径”中的文件。根据所提供的“WatchType”,该来源可以定期监视(每个“间隔”ms)新数据的路径(“FileProcessingMode.Processor_Continuous”),或在路径和exit中的当前数据(“FileProcessingMode.Processing_Once”)。使用“PathFilter”,用户可以进一步排除正在处理的文件。

    成就:

    在发动机罩下,FLink将文件读取过程分成两个子任务,即directory monitoringdata reading。这些子任务中的每一个由单独的实体来实现。监视由单个、非并行(并行度=1)任务实现,而读取是由并行运行的多个任务执行的。后者的并行性等于作业并行度。单个监视任务的作用是扫描目录(根据 watchType定期或只一次),找到要处理的文件,将其划分为splits,并将这些拆分分配给下游读取器。读者是将阅读实际数据的读者。每个分割只由一个读取器读取,而读取器可以逐个读取多个分割。

    重要笔记:

    1. 如果 watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被完全重新处理。这可能会打破“精确一次”的语义,因为在文件末尾附加数据将导致所有其内容被重新处理。

    2. 如果将watchType 设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而无需等待读取器完成读取文件内容。当然,读取器将继续读取,直到读取所有文件内容为止。关闭源导致该点之后不再有更多检查点。这可能导致节点故障后恢复较慢,因为作业将从上一个检查点恢复读取。

基于套接字:

  • socketTextStream-从插槽中读取。元素可以通过分隔符分隔。

以收藏为基础:

  • fromCollection(Collection)—从JavaJava.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。

  • fromCollection(Iterator, Class)—从迭代器创建数据流。类指定迭代器返回的元素的数据类型。

  • fromElements(T ...) —从给定的对象序列创建数据流。所有对象都必须是同一类型的。

  • fromParallelCollection(SplittableIterator, Class)-从迭代器并行创建数据流。类指定迭代器返回的元素的数据类型。

  • generateSequence(from, to) -在给定的间隔中并行地生成数字序列。

习俗:

  • addSource 附加新的源函数。例如,要从ApacheKafka读取,可以使用addSource(newFlinkKafkaConsumer08&lt;&gt;(...))。有关详细信息,请参阅connectors

来源是您的程序从其中读取其输入的地方。您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序中。flink附带许多预实现的源函数,但您可以通过实现非平行源的SourceFunction ,或通过实现ParallelSourceFunction接口或扩展并行源的RichParallelSourceFunction来编写自己的自定义源。

可以从 StreamExecutionEnvironment访问多个预定义的流来源:

基于文件:

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
  • readTextFile(path) —读取文本文件,即尊重 TextInputFormat 规范的文件,逐行读取,并将它们作为String返回。

  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.

  • readFile(fileInputFormat, path)-按指定的文件输入格式指定文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter)—这是前面两个方法内部调用的方法。它根据给定的fileInputFormat读取path 中的文件。根据所提供的watchType,该来源可定期监测(每一个intervalms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或在当前路径和出口(FileProcessingMode.PROCESS_ONCE)中处理一次数据。使用pathFilter,用户可以进一步排除正在处理的文件。

    成就:

    在发动机罩下,FLink将文件读取过程分成两个子任务,即directory monitoringdata reading。这些子任务中的每一个由单独的实体来实现。监视由单个、非并行(并行度=1)任务实现,而读取是由并行运行的多个任务执行的。后者的并行性等于作业并行度。单个监视任务的作用是扫描目录(根据watchType定期或只一次),找到要处理的文件,将其划分为splits,并将这些拆分分配给下游读取器。读者是将阅读实际数据的读者。每个分割只由一个读取器读取,而读取器可以逐个读取多个分割。

    重要笔记:

    1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被完全重新处理。这可能会打破“精确一次”的语义,因为在文件末尾附加数据将导致所有其内容被重新处理。

    2. 如果将 watchType 设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而无需等待读取器完成读取文件内容。当然,读取器将继续读取,直到读取所有文件内容为止。关闭源导致该点之后不再有更多检查点。这可能导致节点故障后恢复较慢,因为作业将从上一个检查点恢复读取。

基于套接字:

  • socketTextStream-从插槽中读取。元素可以通过分隔符分隔。

以收藏为基础:

  • fromCollection(Seq) -从JavaJava.util.Collection.创建数据流。集合中的所有元素必须具有相同类型。

  • fromCollection(Iterator) —从迭代器创建数据流。类指定迭代器返回的元素的数据类型。

  • fromElements(elements: _*) -从给定的对象序列创建数据流。所有对象必须具有相同类型。

  • fromParallelCollection(SplittableIterator) —并行地从迭代器创建数据流。类指定迭代器返回的元素的数据类型。

  • generateSequence(from, to)-在给定的间隔中并行地生成数字序列。

风俗:

  • addSource-附加新的源函数。例如,要从ApacheKafka读取,您可以使用addSource(new FlinkKafkaConsumer08&lt;&gt;(...))。有关详细信息,请参见连接器

DataStream Transformations(Datastream变换)

有关可用流转换的概述,请参见operators

Data Sinks (数据沉没)

数据接收器会消耗数据流并将其转发到文件、套接字、外部系统或打印它们。FLink附带各种内置的输出格式,这些格式被封装在数据流上的操作后面:

  • writeAsText() / TextOutputFormat -将元素作为字符串写入。通过调用每个元素的 toString() 方法来获得字符串。

  • writeAsCsv(...) / CsvOutputFormat “行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr() -打印标准输出/标准错误流上每个元素的 toString() 值。可选的,可以提供一个前缀(MSG),该前缀(MS G)被预先连接到输出。这有助于区分对print的不同调用。如果并行性大于1,则输出也将与产生输出的任务的标识符一起预先结束。

  • writeUsingOutputFormat()‘/FileOutputFormat-Method和基类,用于自定义文件输出。支持自定义对象到字节的转换。

  • writeToSocket - 根据SerializationSchema将元素写入到套接字中

  • addSink -调用自定义接收器函数。Flink与作为接收器函数实现的其他系统(如ApacheKafka)的连接器捆绑在一起。

数据接收器会消耗数据流并将其转发到文件、套接字、外部系统或打印它们。FLink附带各种内置的输出格式,这些格式被封装在数据流上的操作后面:

  • writeAsText() / TextOutputFormat -将元素作为字符串写入。通过调用每个元素的toString()方法来获得字符串。

  • writeAsCsv(...) / CsvOutputFormat - 行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr() -打印标准输出/标准错误流上每个元素的 toString() 值。可选的,可以提供一个前缀(MSG),该前缀(MS G)被预先连接到输出。这有助于区分对 print 的不同调用。如果并行性大于1,则输出也将与产生输出的任务的标识符一起预先结束。

  • writeUsingOutputFormat() / FileOutputFormat -用于自定义文件输出的方法和基类。支持自定义对象到字节的转换..

  • writeToSocket -根据SerializationSchema将元素写入到套接字中

  • addSink -调用自定义接收器函数。Flink与作为接收器函数实现的其他系统(如ApacheKafka)的连接器捆绑在一起。

请注意,DataStream上的write*()方法主要用于调试目的。它们不参与FLink的检查点操作,这意味着这些函数通常具有至少一次语义。对目标系统的数据刷新取决于输出格式的实现。这意味着并非所有发送到OutputFormat的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了可靠,精确地将流传递到文件系统中,请使用flink-connector-filesystem。此外,通过.addSink(...) 方法的自定义实现可以为一次语义参与Flink的检查点。

Iterations (迭代)

迭代流程序实现了STEP函数,并将其嵌入到IterativeStream中。由于Datastream程序可能永远不会完成,所以没有最大的迭代次数。相反,您需要指定流的哪一部分被反馈回迭代,哪些部分使用split转换或filter转发到下游。这里,我们展示了一个使用过滤器的例子。首先,我们定义了一个IterativeStream

  1. IterativeStream<Integer> iteration = input.iterate();

然后,我们使用一系列转换(这里是一个简单的map转换)指定将在循环中执行的逻辑。

  1. DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

若要结束迭代并定义迭代尾,请调用IterativeStreamcloseWith(feedbackStream)方法。给予closeWith函数的Datastream将反馈给迭代头。一种常见的模式是使用过滤器将反馈的流部分和前向传播的流部分分开。例如,这些过滤器可以定义“终止”逻辑,其中允许元素向下游传播,而不是反馈。

  1. iteration.closeWith(iterationBody.filter(/* one part of the stream */));
  2. DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,下面的程序不断地从一系列整数中减去1,直到它们达到零为止:

  1. DataStream<Long> someIntegers = env.generateSequence(0, 1000);
  2. IterativeStream<Long> iteration = someIntegers.iterate();
  3. DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  4. @Override
  5. public Long map(Long value) throws Exception {
  6. return value - 1 ;
  7. }
  8. });
  9. DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  10. @Override
  11. public boolean filter(Long value) throws Exception {
  12. return (value > 0);
  13. }
  14. });
  15. iteration.closeWith(stillGreaterThanZero);
  16. DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  17. @Override
  18. public boolean filter(Long value) throws Exception {
  19. return (value <= 0);
  20. }
  21. });

迭代流程序实现了STEP函数,并将其嵌入到IterativeStream中。由于Datastream程序可能永远不会完成,所以没有最大的迭代次数。相反,您需要指定流的哪一部分被反馈回迭代,哪些部分使用split转换或filter转发到下游。这里,我们展示了一个示例迭代,其中主体(计算中重复的部分)是一个简单的映射转换,反馈的元素通过使用过滤器向下游转发的元素来区分。

  1. val iteratedStream = someDataStream.iterate(
  2. iteration => {
  3. val iterationBody = iteration.map(/* this is executed many times */)
  4. (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
  5. })

例如,下面的程序不断地从一系列整数中减去1,直到它们达到零为止:

  1. val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
  2. val iteratedStream = someIntegers.iterate(
  3. iteration => {
  4. val minusOne = iteration.map( v => v - 1)
  5. val stillGreaterThanZero = minusOne.filter (_ > 0)
  6. val lessThanZero = minusOne.filter(_ <= 0)
  7. (stillGreaterThanZero, lessThanZero)
  8. }
  9. )

Execution Parameters(执行参数)

StreamExecutionEnvironment包含允许为运行时设置作业特定配置值的 ExecutionConfig

有关大多数参数的解释,请参阅[执行配置](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/execution_configuration.html)。这些参数具体涉及Data StreamAPI:

  • setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。可以使用long getAutoWatermarkInterval()获取当前值

Fault Tolerance(容错(性))

State&Checkpointing描述了如何启用和配置Flink的检查点机制。

Controlling Latency(控制延迟)

默认情况下,元素不是一个接一个地在网络上传输(这会导致不必要的网络通信),而是被缓冲。缓冲区的大小(实际上在机器之间传输)可以在Flink配置文件中设置。虽然这种方法有利于优化吞吐量,但当传入流不够快时,可能会导致延迟问题。要控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis) 来设置缓冲区填充的最大等待时间。之后,即使缓冲区没有满,缓冲区也会自动发送。此超时的默认值为100 ms。

使用:

  1. LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. env.setBufferTimeout(timeoutMillis);
  3. env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  1. val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
  2. env.setBufferTimeout(timeoutMillis)
  3. env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,设置setBufferTimeout(-1),该设置将删除超时,并且缓冲区将仅在其满时被刷新。要使延迟最小化,将超时设置为接近0的值(例如5或10ms)。应避免0的缓冲区超时,因为它可能会导致严重的性能降级。

Debugging

Debugging(调试)

在分布式集群中运行流程序之前,最好确保实现的算法按需要工作。因此,实现数据分析程序通常是一个检查结果、调试和改进的增量过程。

Flink通过支持IDE内部的本地调试、测试数据的注入和结果数据的收集,大大简化了数据分析程序的开发过程。本节给出了如何简化Flink程序开发的一些提示。

Local Execution Environment(本地执行环境)

LocalStreamEnvironment在创建它的JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。v

创建并使用LocalEnvironment如下:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. DataStream<String> lines = env.addSource(/* some source */);
  3. // build your program
  4. env.execute();
  1. val env = StreamExecutionEnvironment.createLocalEnvironment()
  2. val lines = env.addSource(/* some source */)
  3. // build your program
  4. env.execute()

Collection Data Sources(收集数据源)

Flink提供由Java集合支持的特殊数据源,以方便测试。一旦程序经过测试,源和汇就可以很容易地被从/写入外部系统的源和汇所取代。

采集数据源可按如下方式使用:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  2. // Create a DataStream from a list of elements
  3. DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
  4. // Create a DataStream from any Java collection
  5. List<Tuple2<String, Integer>> data = ...
  6. DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
  7. // Create a DataStream from an Iterator
  8. Iterator<Long> longIt = ...
  9. DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
  1. val env = StreamExecutionEnvironment.createLocalEnvironment()
  2. // Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)
  3. // Create a DataStream from any Collection val data: Seq[(String, Int)] = ...
  4. val myTuples = env.fromCollection(data)
  5. // Create a DataStream from an Iterator val longIt: Iterator[Long] = ...
  6. val myLongs = env.fromCollection(longIt)

Note: 目前,收集数据源要求数据类型和迭代器实现Serializable。此外,收集数据源不能并行执行(parallelism=1)。

Iterator Data Sink (迭代器数据接收器)

Flink还提供一个接收器来收集Datastream结果,以便进行测试和调试。它可用于以下方面:

  1. import org.apache.flink.streaming.experimental.DataStreamUtils
  2. DataStream<Tuple2<String, Integer>> myResult = ...
  3. Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
  1. import org.apache.flink.streaming.experimental.DataStreamUtils
  2. import scala.collection.JavaConverters.asScalaIteratorConverter
  3. val myResult: DataStream[(String, Int)] = ...
  4. val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

Note: 从Flink 1.5.0中移除flink-streaming-contrib模块。它的类已被移到 flink-streaming-javaflink-streaming-scala中。

下一步要去哪里?