译者:flink.sojb.cn

Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中DataSet API用于批处理,DataStream API用于流式处理。本指南将介绍两种API共有的基本概念,但请参阅我们的 流处理指南批处理指南,了解有关使用每个API编写程序的具体信息。

注:当显示的API时,如何使用,我们将用实际的例子 StreamingExecutionEnvironmentDataStreamAPI。DataSetAPI中的概念完全相同,只需替换为ExecutionEnvironmentDataSet

DataSet和DataStream

Flink具有特殊类DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet数据有限的情况下,对于一个DataStream数据元的数量可以是无界的。

这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。

集合最初通过在Flink程序添加源创建和新的集合从这些通过将它们使用API方法如衍生mapfilter等等。

Flink计划的剖析

Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

  1. 获得一个execution environment
  2. 加载/创建初始数据,
  3. 指定此数据的转换,
  4. 指定放置计算结果的位置,
  5. 触发程序执行

我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java DataSet API的所有核心类都可以在org.apache.flink.api.java包中找到, 而Java DataStream API的类可以在org.apache.flink.streaming.api中找到 。

StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态方法获取一个StreamExecutionEnvironment

  1. getExecutionEnvironment()
  2. createLocalEnvironment()
  3. createRemoteEnvironment(String host, int port, String... jarFiles)

通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过命令行调用它 ,则Flink集群管理器将执行您的main方法并getExecutionEnvironment()返回一个运行环境,以便在集群上执行您的程序。

对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = env.readTextFile("file:///path/to/file");

这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。

您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,Map转换如下所示:

  1. DataStream<String> input = ...;
  2. DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
  3. @Override
  4. public Integer map(String value) {
  5. return Integer.parseInt(value);
  6. }
  7. });

这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。

一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:

  1. writeAsText(String path)
  2. print()

我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala DataSet API的所有核心类都可以在org.apache.flink.api.scala包中找到, 而Scala DataStream API的类可以在org.apache.flink.streaming.api.scala中找到 。

StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态方法获取一个StreamExecutionEnvironment

  1. getExecutionEnvironment()
  2. createLocalEnvironment()
  3. createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过命令行调用它 ,则Flink集群管理器将执行您的main方法并getExecutionEnvironment()返回一个运行环境,以便在集群上执行您的程序。

对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val text: DataStream[String] = env.readTextFile("file:///path/to/file")

这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。

您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,Map转换如下所示:

  1. val input: DataSet[String] = ...
  2. val mapped = input.map { x => x.toInt }

这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。

一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:

  1. writeAsText(path: String)
  2. print()

一旦您指定的完整程序,你需要触发执行程序调用 execute()StreamExecutionEnvironment。根据执行的类型,ExecutionEnvironment将在本地计算机上触发执行或提交程序以在群集上执行。

execute()方法返回一个JobExecutionResult,包含执行时间和累加器结果。

有关流数据源和接收器的信息,请参阅流指南,以及有关DataStream上支持的转换的更深入信息。

有关批处理数据源和接收器的信息,请查看批处理指南,以及有关DataSet支持的转换的更深入信息。

懒惰的评价

所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当execute()运行环境上的调用显式触发执行时,实际执行 算子操作。程序是在本地执行还是在集群上执行取决于运行环境的类型

懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。

指定Keys

某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在Keys上分组。

DataSet被分组为

  1. DataSet<...> input = // [...]
  2. DataSet<...> reduced = input
  3. .groupBy(/*define key here*/)
  4. .reduceGroup(/*do something*/);

虽然可以使用DataStream指定Keys

  1. DataStream<...> input = // [...]
  2. DataStream<...> windowed = input
  3. .keyBy(/*define key here*/)
  4. .window(/*window specification*/);

Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。

注意:在下面的讨论中,我们将使用DataStreamAPI和keyBy。对于DataSet API,您只需要替换为DataSetgroupBy

定义元组的键

最简单的情况是在元组的一个或多个字段上对元组进行分组:

  1. DataStream<Tuple3<Integer,String,Long>> input = // [...]
  2. KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
  1. val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0)

元组在第一个字段(整数类型)上分组。

  1. DataStream<Tuple3<Integer,String,Long>> input = // [...]
  2. KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
  1. val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1)

在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。

关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:

  1. DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定keyBy(0)将导致系统使用full Tuple2作为键(以Integer和Float为键)。如果要“导航”到嵌套中Tuple2,则必须使用下面解释的字段表达式键。

使用Field Expressions定义键

您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或coGrouping的键。

字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如TuplePOJO类型。

在下面的示例中,我们有一个WCPOJO,其中包含两个字段“word”和“count”。要按字段分组word,我们只需将其名称传递给keyBy()函数即可。

  1. // some ordinary POJO (Plain old Java Object)
  2. public class WC {
  3. public String word;
  4. public int count;
  5. }
  6. DataStream<WC> words = // [...]
  7. DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

字段表达式语法

  • 按字段名称选择POJO字段。例如,"user"指POJO类型的“用户”字段。

  • 按字段名称或0偏移字段索引选择元组字段。例如"f0",分别"5"引用Java Tuple类型的第一个和第六个字段。

  • 您可以在POJO和Tuples中选择嵌套字段。例如,"user.zip"指POJO的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌套和混合,例如"f1.user.zip""user.f3.1.zip"

  • 您可以使用"*"通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。

字段表达示例

  1. public static class WC {
  2. public ComplexNestedClass complex; //nested POJO
  3. private int count;
  4. // getter / setter for private field (count)
  5. public int getCount() {
  6. return count;
  7. }
  8. public void setCount(int c) {
  9. this.count = c;
  10. }
  11. }
  12. public static class ComplexNestedClass {
  13. public Integer someNumber;
  14. public float someFloat;
  15. public Tuple3<Long, Long, String> word;
  16. public IntWritable hadoopCitizen;
  17. }

这些是上面示例代码的有效字段表达式:

  • "count":类中的count字段WC

  • "complex":递归选择POJO类型的字段复合体的所有字段ComplexNestedClass

  • "complex.word.f2":选择嵌套的最后一个字段Tuple3

  • "complex.hadoopCitizen":选择Hadoop IntWritable类型。

在下面的示例中,我们有一个WCPOJO,其中包含两个字段“word”和“count”。要按字段分组word,我们只需将其名称传递给keyBy()函数即可。

  1. // some ordinary POJO (Plain old Java Object)
  2. class WC(var word: String, var count: Int) {
  3. def this() { this("", 0L) }
  4. }
  5. val words: DataStream[WC] = // [...]
  6. val wordCounts = words.keyBy("word").window(/*window specification*/)
  7. // or, as a case class, which is less typing
  8. case class WC(word: String, count: Int)
  9. val words: DataStream[WC] = // [...]
  10. val wordCounts = words.keyBy("word").window(/*window specification*/)

字段表达式语法

  • 按字段名称选择POJO字段。例如,"user"指POJO类型的“用户”字段。

  • 通过1偏移字段名称或0偏移字段索引选择元组字段。例如"_1",分别"5"引用Scala Tuple类型的第一个和第六个字段。

  • 您可以在POJO和Tuples中选择嵌套字段。例如,"user.zip"指POJO的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌套和混合,例如"_2.user.zip""user._4.1.zip"

  • 您可以使用"_"通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。

字段表达示例

  1. class WC(var complex: ComplexNestedClass, var count: Int) {
  2. def this() { this(null, 0) }
  3. }
  4. class ComplexNestedClass(
  5. var someNumber: Int,
  6. someFloat: Float,
  7. word: (Long, Long, String),
  8. hadoopCitizen: IntWritable) {
  9. def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
  10. }

这些是上面示例代码的有效字段表达式:

  • "count":类中的count字段WC

  • "complex":递归选择POJO类型的字段复合体的所有字段ComplexNestedClass

  • "complex.word._3":选择嵌套的最后一个字段Tuple3

  • "complex.hadoopCitizen":选择Hadoop IntWritable类型。

使用键选择器函数定义键

定义键的另一种方法是“键选择器”函数。键选择器函数将单个数据元作为输入并返回数据元的键。Keys可以是任何类型,并且可以从确定性计算中导出。

以下示例显示了一个键选择器函数,它只返回一个对象的字段:

  1. // some ordinary POJO
  2. public class WC {public String word; public int count;}
  3. DataStream<WC> words = // [...]
  4. KeyedStream<WC> keyed = words
  5. .keyBy(new KeySelector<WC, String>() {
  6. public String getKey(WC wc) { return wc.word; }
  7. });
  1. // some ordinary case class case class WC(word: String, count: Int)
  2. val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word )

指定转换函数

大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法

实现接口

最基本的方法是实现一个提供的接口:

  1. class MyMapFunction implements MapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };
  4. data.map(new MyMapFunction());

匿名课程

您可以将函数作为匿名类传递:

  1. data.map(new MapFunction<String, Integer> () {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. });

Java 8 Lambdas

Flink还支持Java API中的Java 8 Lambdas。

  1. data.filter(s -> s.startsWith("http://"));
  1. data.reduce((i1,i2) -> i1 + i2);

函数丰富

需要用户定义函数的所有转换都可以将函数作为参数。例如,而不是

  1. class MyMapFunction implements MapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };

你可以写

  1. class MyMapFunction extends RichMapFunction<String, Integer> {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. };

并像往常一样将函数传递给map转换:

  1. data.map(new MyMapFunction());

丰富的函数也可以定义为匿名类:

  1. data.map (new RichMapFunction<String, Integer>() {
  2. public Integer map(String value) { return Integer.parseInt(value); }
  3. });

Lambda函数

正如前面的例子中所见,所有 算子操作都接受lambda函数来描述 算子操作:

  1. val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") }
  1. val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 }
  2. // or data.reduce { _ + _ }

函数丰富

将lambda函数作为参数的所有转换都可以将函数作为参数。例如,而不是

  1. data.map { x => x.toInt }

你可以写

  1. class MyMapFunction extends RichMapFunction[String, Int] {
  2. def map(in: String):Int = { in.toInt }
  3. };

并将函数传递给map转换:

  1. data.map(new MyMapFunction())

丰富的函数也可以定义为匿名类:

  1. data.map (new RichMapFunction[String, Int] {
  2. def map(in: String):Int = { in.toInt }
  3. })

丰富的函数提供,除了用户定义的函数(Map,Reduce等),四种方法:openclosegetRuntimeContext,和 setRuntimeContext。这些用于参数化函数(请参阅将参数传递给函数),创建和完成本地状态,访问广播变量(请参阅 广播变量)以及访问运行时信息(如累加器和计数器)(请参阅 累加器和计数器)以及有关信息的信息。迭代(参见迭代)。

支持的数据类型

Flink对可以在DataSet或DataStream中的数据元类型进行了一些限制。原因是系统分析类型以确定有效的执行策略。

有六种不同类别的数据类型:

  1. Java元组Scala案例类
  2. Java POJO
  3. 原始类型
  4. 常规课程
  5. Hadoop Writables
  6. 特殊类型

元组和案例类

元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供Tuple1最多的类Tuple25。元组的每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。可以使用字段名称直接访问元组的字段tuple.f4,或使用通用getter方法 tuple.getField(int position)。字段索引从0开始。请注意,这与Scala元组形成鲜明对比,但它与Java常规索引更为一致。

  1. DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
  2. new Tuple2<String, Integer>("hello", 1),
  3. new Tuple2<String, Integer>("world", 2));
  4. wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
  5. @Override
  6. public Integer map(Tuple2<String, Integer> value) throws Exception {
  7. return value.f1;
  8. }
  9. });
  10. wordCounts.keyBy(0); // also valid .keyBy("f0")

Scala案例类(和Scala元组是案例类的特例)是包含固定数量的具有各种类型的字段的复合类型。元组字段通过其1偏移名称来寻址,例如_1第一个字段。案例类字段按名称访问。

  1. case class WordCount(word: String, count: Int)
  2. val input = env.fromElements(
  3. WordCount("hello", 1),
  4. WordCount("world", 2)) // Case Class Data Set
  5. input.keyBy("word")// key by field expression "word"
  6. val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
  7. input2.keyBy(0, 1) // key by field positions 0 and 1

POJOs

如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

  • 这堂课必须公开。

  • 它必须有一个没有参数的公共构造函数(默认构造函数)。

  • 所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名为foogetter和setter方法的字段必须命名getFoo()setFoo()

  • Flink必须支持字段的类型。目前,Flink使用Avro序列化任意对象(例如Date)。

Flink分析了POJO类型的结构,即它了解了POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

以下示例显示了一个包含两个公共字段的简单POJO。

  1. public class WordWithCount {
  2. public String word;
  3. public int count;
  4. public WordWithCount() {}
  5. public WordWithCount(String word, int count) {
  6. this.word = word;
  7. this.count = count;
  8. }
  9. }
  10. DataStream<WordWithCount> wordCounts = env.fromElements(
  11. new WordWithCount("hello", 1),
  12. new WordWithCount("world", 2));
  13. wordCounts.keyBy("word"); // key by field expression "word"
  1. class WordWithCount(var word: String, var count: Int) {
  2. def this() {
  3. this(null, -1)
  4. }
  5. }
  6. val input = env.fromElements(
  7. new WordWithCount("hello", 1),
  8. new WordWithCount("world", 2)) // Case Class Data Set
  9. input.keyBy("word")// key by field expression "word"

原始类型

Flink支持所有Java和Scala的原始类型,如IntegerStringDouble

一般类别

Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。遵循Java Beans约定的类通常可以很好地工作。

所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类型处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架Kryo对常规类型进行反序列化。

类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过org.apache.flinktypes.Value使用方法read和实现接口为这些 算子操作提供自定义代码write。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。

org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。

Flink带有与基本数据类型对应的预定义值类型。(ByteValueShortValueIntValueLongValueFloatValueDoubleValueStringValueCharValueBooleanValue)。这些Value类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。

Hadoop Writables

您可以使用实现该org.apache.hadoop.Writable接口的类型。write()readFields()方法中定义的序列化逻辑将用于序列化。

特殊类型

您可以使用特殊类型,包括Scala的EitherOptionTry。Java API有自己的自定义实现Either。与Scala类似Either,它代表两种可能类型的值,Either可用于错误处理或需要输出两种不同类型记录的 算子。

类型擦除和类型推断

注意:本节仅适用于Java。

Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,JVM的实例DataStream&lt;String&gt;DataStream&lt;Long&gt;外观相同。

Flink在准备执行程序时(当调用程序的主要方法时)需要类型信息。Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和 算子中。您可以通过检索类型DataStream.getType()。该方法返回一个实例TypeInformation,这是Flink表示类型的内部方式。

类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如 ExecutionEnvironment.fromCollection(),您可以传递描述类型的参数的位置。但是通用函数MapFunction&lt;I, O&gt;也可能需要额外的类型信息。

ResultTypeQueryable 接口可以通过输入格式和函数来实现明确地告诉API他们的返回类型。调用函数的输入类型通常可以通过先前 算子操作的结果类型来推断。

累加器和计数器

累加器是具有添加 算子操作最终累积结果的简单构造,可在作业结束后使用。

最直接的累加器是一个计数器:您可以使用该Accumulator.add(V value)方法递增它 。在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果您想快速了解有关数据的更多信息,累加器非常有用。

Flink目前有以下内置累加器。它们中的每一个都实现了 Accumulator 接口。

  • IntCounterLongCounter DoubleCounter:请参阅下面的使用计数器的示例。
  • 直方图:离散数量的区间的直方图实现。在内部,它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数的分布。

如何使用累加器:

首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。

  1. private IntCounter numLines = new IntCounter();

其次,您必须注册累加器对象,通常在函数的open()方法中 。在这里您还可以定义名称。

  1. getRuntimeContext().addAccumulator("num-lines", this.numLines);

您现在可以在 算子函数中的任何位置使用累加器,包括在open()close()方法中。

  1. this.numLines.add(1);

整个结果将存储在JobExecutionResultexecute()运行环境的方法返回的对象中(当前这仅在执行等待作业完成时才有效)。

  1. myJobExecutionResult.getAccumulatorResult("num-lines")

所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同算子函数中使用相同的累加器。Flink将在内部合并所有具有相同名称的累加器。

关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用 聚合器 来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。

定制累加器:

要实现自己的累加器,只需编写Accumulator接口的实现即可。如果您认为自定义累加器应与Flink一起提供,请随意创建拉取请求。

您可以选择实现 AccumulatorSimpleAccumulator

Accumulator&lt;V,R&gt;最灵活:它定义V要添加的值的类型R,以及最终结果的结果类型。例如,对于直方图,V是数字并且R是直方图。SimpleAccumulator适用于两种类型相同的情况,例如计数器。