基本API概念
Flink程序是在分布式集合上实现转换的常规程序(例如,筛选、映射、更新状态、加入、分组、定义窗口、聚合)。集合最初是从源创建的(例如,通过读取文件、Kafka主题或本地内存集合)。结果通过接收器返回,例如接收器可以将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中进行,也可以在许多计算机集群上执行。
根据数据源的类型(即有界源或无界源),您可以编写批处理程序或流程序,其中DataSet API用于批处理,Datastream API用于流。本指南将介绍两个API所共有的基本概念,但请参阅我们的流式Guide和批处理Guide],以获得有关使用每个API编写程序的具体信息。
NOTE: 在展示如何使用API的实际示例时,我们将使用 StreamingExecutionEnvironment
和 DataStream
API。Dataset
API中的概念完全相同,只需用ExecutionEnvironment
和Dataset
替换即可。
DataSet和Datastream
Flink有特殊的类Dataset
和DataStream
来表示程序中的数据。您可以认为它们是不可变的数据集合,可以包含重复的数据。在Dataset
的情况下,数据是有限的,而对于DataStream
,元素的数量可以是无界的。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建了它们,就不能添加或删除元素。您也不能简单地检查内部的元素。
集合最初是通过在Flink程序中添加一个源来创建的,而新的集合是通过使用API方法(如map
、filter
等)来派生的。
Flink程序的剖析
Flink程序看起来像转换数据集合的常规程序。每个程序由相同的基本部分组成:
- 获得一个
execution environment
, - 加载/创建初始数据,
- 指定对该数据的转换,
- 指定将计算结果放在何处,
- 触发程序执行
我们现在将概述每一个步骤,请参阅相关章节以获得更多详细信息。注意,JavaDataSet api的所有核心类都可以在包org.apache.flink.api.java中找到,而JavaDatastreamapi的类可以在org.apache.flink.streaming.api.中找到。
StreamExecutionEnvironment
是所有Flink程序的基础。您可以在StreamExecutionEnvironment
上使用这些静态方法获得一个:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,您只需要使用getExecutionEnvironment()
,因为这将根据上下文做正确的事情:如果您在IDE中执行程序,或者作为常规Java程序,它将创建一个本地环境,该环境将在您的本地机器上执行您的程序。如果您从您的程序中创建了一个JAR文件,并通过命令line,)调用它,Flink集群管理器将执行您的主要方法,而getExecutionEnvironment()
将返回一个用于在集群上执行您的程序的执行环境。
为了指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,作为CSV文件,或者使用完全自定义的数据输入格式。要将文本文件读入行序列,可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
这将为您提供一个Datastream,然后可以在其上应用转换来创建新的派生数据流。
通过使用转换函数调用Datastream上的方法来应用转换。例如,映射转换如下所示:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
这将通过将原始集合中的每个字符串转换为Integer来创建一个新的Datastream。
一旦有了包含最终结果的Datastream,就可以通过创建接收器将其写入外部系统。以下只是创建接收器的一些示例方法:
writeAsText(String path)
print()
我们现在将概述每一个步骤,请参阅相关章节以获得更多详细信息。注意,scala数据集api的所有核心类都在包org.apache.flink.api.scala中,而scala datastream api的类可以在org.apache.flink.streaming.api.scala.中找到。
StreamExecutionEnvironment
是所有Flink程序的基础。您可以在“StreamExecutionEnvironment”上使用这些静态方法获得一个:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常,您只需要使用getExecutionEnvironment()
,因为这将根据上下文做正确的事情:如果您在IDE中执行程序,或者作为常规Java程序,它将创建一个本地环境,该环境将在您的本地机器上执行您的程序。如果您从您的程序中创建了一个JAR文件,并通过命令line,)调用它,Flink集群管理器将执行您的主要方法,而getExecutionEnvironment()
将返回一个用于在集群上执行您的程序的执行环境。
为了指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,作为CSV文件,或者使用完全自定义的数据输入格式。要将文本文件读入行序列,可以使用:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将为您提供一个Datastream,然后可以在其上应用转换来创建新的派生数据流。
您通过在DataSet上调用具有转换函数的方法来应用转换。例如,地图转换看起来是这样的:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这将通过将原始集合中的每个字符串转换为Integer来创建一个新的Datastream。
一旦有了包含最终结果的Datastream,就可以通过创建接收器将其写入外部系统。以下只是创建接收器的一些示例方法:
writeAsText(path: String)
print()
一旦指定了完整的程序,就需要触发程序执行,方法是在StreamExecutionEnvironment
上调用Execute()
。根据ExecutionEnvironment
的类型,执行将在本地计算机上触发,或者提交您的程序以在集群上执行。
Execute()
方法返回一个JobExecutionResult
,它包含执行时间和累加器结果。
请参阅关于流数据源和接收器的信息以及有关数据传输支持的转换的更多深度信息,请参见流Guide。
查看批处理Guide获取有关批处理数据源和接收器的信息以及有关数据集上支持的转换的更深入信息。
Lazy Evaluation(惰性计算)
所有的Flink程序都是延迟执行的:当程序的主要方法被执行时,数据加载和转换不会直接发生。相反,每个操作都被创建并添加到程序的计划中。当执行环境上的execute()
调用显式地触发执行时,实际执行操作。程序是在本地执行还是在集群上执行取决于执行环境的类型。
惰性评估允许您构造复杂的程序,Flink作为一个整体计划的单元执行这些程序。
Specifying Keys(指定键)
有些转换(Join、coGroup、keyBy、groupBy)要求在元素集合上定义键。其他转换(减少、组还原、聚合、Windows)允许在应用数据之前将数据分组在键上。
数据集分组为
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
虽然可以使用
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
弗林克的数据模型不是基于键值对的..因此,您不需要将数据集类型物理地打包成键和值。键是“虚拟的”:它们被定义为对实际数据的函数,以指导分组操作符。
NOTE: 在下面的讨论中,我们将使用DataStream
API和keyBy
。对于DataSet API,您只需将其替换为DataSet
和groupBy
。
定义元组的键
最简单的情况是在Tuple的一个或多个字段上分组元组:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0)
元组在第一个字段(Integer类型的字段)上分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1)
在这里,我们将元组分组在一个由第一个字段和第二个字段组成的复合键上。
关于嵌套元组的注意事项:如果您有一个带有嵌套元组的Datastream,如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)
将导致系统使用完整的Tuple2
作为键(以整数和浮点数作为键)。如果要“导航”到嵌套的Tuple2
,则必须使用字段表达式键,如下所示。
使用字段表达式定义键
可以使用基于字符串的字段表达式引用嵌套字段,并定义用于分组、排序、连接或coGrouping的键。
字段表达式使在tuple和pojo类型等(嵌套)组合类型中选择字段变得非常容易。
在下面的示例中,我们有一个WC
POJO,它有两个字段“Word”和“Count”。要按字段word
分组,只需将其名称传递给keyBy()
函数。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
字段表达式语法:
按字段名称选择POJO字段。例如,
"user"
是指POJO类型的“用户”。按字段名或0偏移量字段索引选择元组字段.例如,
"f0"
和"5"
分别指Java元组类型的第一个和第六个字段。您可以在POJO和元组中选择嵌套字段。例如,
“user.zip”
指POJO的“zip”字段,该字段存储在POJO类型的“user”字段中。支持任意嵌套和混合POJO和元组,例如"f1.user.zip"
或"user.f3.1.zip"
。您可以使用
“*”
通配符表达式选择完整类型。这也适用于不是元组或POJO类型的类型。
字段表达式示例:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
这些是上述示例代码的有效字段表达式:
"count"
:WC
类中的计数字段。"complex"
: 递归地选择POJO类型的“ComplexNestedClass”字段复合体的所有字段。"complex.word.f2"
: 选择嵌套的Tuple3
的最后一个字段。"complex.hadoopCitizen"
: 选择HadoopIntWritable
类型。
在下面的示例中,我们有一个WC
POJO,它有两个字段“Word”和“Count”。要按字段word
分组,只需将其名称传递给keyBy()
函数。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
字段表达式语法:
按字段名称选择POJO字段。例如,
"user"
是指POJO类型的“用户”。通过其1-offset字段名或0-offset字段索引选择Tuple字段..例如,`````````````分别指Scala Tuple类型的第一个和第六个字段。
可以在POJO和元组中选择嵌套字段。例如,
"user.zip"
是指POJO的“zip”,它存储在POJO类型的“用户”中。POJO和元组的任意嵌套和混合被支持,例如"_2.user.zip"
或"user._4.1.zip"
。可以使用
"_"
通配符表达式选择完整类型。这也适用于不是元组或POJO类型的类型。
字段表达式示例:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
这些是上述示例代码的有效字段表达式:
"count"
:WC
类中的计数字段。"complex"
: 递归地选择POJO类型的ComplexNestedClass
字段复合体的所有字段。"complex.word._3"
: 选择嵌套的Tuple3
的最后一个字段。"complex.hadoopCitizen"
: 选择HadoopIntWritable
类型。
使用密钥选择器功能定义密钥
另一种定义键的方法是“键选择器”函数。键选择器函数以单个元素作为输入,并返回元素的键。键可以是任意类型的,并且可以从确定性计算中导出。
以下示例显示了简单返回对象的字段的密钥选择器功能:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
// some ordinary case class case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word )
指定转换函数
实现接口
最基本的方法是实现提供的接口之一:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
匿名类
可以将函数作为匿名类传递:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas
FLink还支持JavaAPI中的Java 8 Lambdas。
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
功能丰富
All transformations that require a user-defined function can instead take as argument a rich function. For example, instead of 所有需要用户定义函数的转换都可以将 rich function作为参数。例如,而不是
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
你可以写
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一样将函数传递给map
转换:
data.map(new MyMapFunction());
还可以将富函数定义为匿名类:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
Lambda 功能
如前面的示例中已经看到的,所有操作都接受用于描述操作的lambda函数:
val data: DataSet[String] = // [...] data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...] data.reduce { (i1,i2) => i1 + i2 }
// or data.reduce { _ + _ }
Rich 功能
All transformations that take as argument a lambda function can instead take as argument a rich function. For example, instead of 所有以lambda函数作为参数的转换都可以作为参数 rich function。例如,而不是
data.map { x => x.toInt }
你可以写
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
};
并将函数传递给map
转换:
data.map(new MyMapFunction())
还可以将富函数定义为匿名类:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
丰富的函数除了用户定义的函数(map、down等)外,还提供了四种方法:open
、close
、getRuntimeContext
和setRuntimeContext
。这对于参数化函数很有用(参见将参数传递给Functions),,创建和最后确定本地状态,访问广播变量(请参阅广播Variables),])和访问运行时信息,如累加器和计数器(参见累加器和计数器),以及关于迭代的信息(请参见Iterations).)。
支持的数据类型
Flink对DataSet或Datastream中的元素类型设置了一些限制。其原因是系统分析类型来确定有效的执行策略。
有六种不同的数据类型:
- Java Tuples 和 Scala Case Classes
- Java POJOs
- Primitive Types
- Regular Classes
- Values
- Hadoop Writables
- Special Types
元组和案例类
元组是包含固定数量的具有不同类型的字段的复合类型。JavaAPI提供了从 Tuple1
到Tuple25
的类。元组的每个字段都可以是任意的Flink类型,包括进一步的元组,从而形成嵌套的元组。可以使用字段名称“tuple.f4”或使用泛型getter方法tuple.getField(int位置)
直接访问元组的字段。字段索引从0开始。请注意,这与Scala元组形成了对比,但它更符合Java的一般索引。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
Scala案例类(和Scala元组是CASE类的特例)是包含固定数量的不同类型字段的组合类型。元组字段的地址是它们的1偏移量名称,例如第一个字段的_1
。用名称访问CASE类字段。
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
POJOs
如果Java和Scala类满足以下要求,则将Java和Scala类作为特殊POJO数据类型进行处理:
这个类必须是公开的。
它必须具有不带参数的公共构造函数(默认构造函数)。
所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名为
foo
的字段,getter和setter方法必须命名为getFoo()
和setFoo()
。字段的类型必须由flink支持。此时,flink使用[avro](http://avro.apache.org)序列化任意对象(如“日期”)。
Flink分析POJO类型的结构,即了解POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。
下面的示例显示了一个带有两个公共字段的简单POJO。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
Primitive Types 原语类型
Flink支持所有Java和Scala原语类型,如Integer
、String
和Double
。
General Class Types一般类类型
Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含不能序列化的字段的类,如文件指针、I/O流或其他本机资源。遵循JavaBeans约定的类一般运行良好。
未标识为POJO类型(见上文POJO要求)的所有类都由FLink作为一般类类型进行处理。FLink将这些数据类型视为黑色框,并且无法访问其内容(即,用于高效排序)。使用序列化框架Kryo对一般类型进行了反序列化。
Values
Value 类型描述了手动序列化和反序列化。它们不是通过通用的序列化框架,而是通过实施“org.apache.flinktypes.Value”接口和“读取”和“写入”的方法,为这些操作提供自定义代码。当通用串行化是非常低效的时,使用值类型是合理的。示例是实现作为阵列的元素的稀疏矢量的数据类型。知道数组大部分为零,可以使用非零元素的特殊编码,而通用序列化将简单地写入所有数组元素。
org.apache.flinkypes.CopyableValue
接口以类似的方式支持手动内部克隆逻辑。
Flink附带了与基本数据类型相对应的预定义值类型。(ByteValue
, ShortValue
, IntValue
, LongValue
, FloatValue
, DoubleValue
, StringValue
, CharValue
, BooleanValue
)。这些值类型充当基本数据类型的可变变体:它们的值可以更改,允许程序员重用对象并减轻垃圾收集器的压力。
Hadoop Writables
您可以使用实现org.apache.hadoop.Writable
接口的类型。在write()
和readFields()
方法中定义的序列化逻辑将用于序列化。
Special Types
Special Types 特殊类型
可以使用特殊类型,包括ScalaEither
、Option
和Try
。JavaAPI具有其自己的“任一个”的自定义实现。类似于Scala的“任一个”,它代表一个两种可能类型的值,left_or_right。“”可以用于需要输出两种不同类型记录的错误处理或运算符。
Type Erasure & Type Inference (类型擦除与类型推断)
Note: 本节仅适用于 Java.
Java编译器在编译后丢弃了大部分的泛型类型信息。这称为 type erasure Java。它意味着在运行时,对象的实例不知道它的一般类型。例如,DataStream<String>
和DataStream<Long>
的实例与JVM相同。
Flink在准备执行程序时(当程序的主要方法被调用时)需要类型信息。Flink JavaAPI试图重建以各种方式丢弃的类型信息,并显式地将其存储在数据集和操作符中。您可以通过 DataStream.getType()
检索类型。该方法返回一个 TypeInformation
实例,这是Flink表示类型的内部方法。
类型推理有其局限性,在某些情况下需要程序员的“合作”。其中的例子是从集合中创建数据集的方法,例如ExecutionEnvironment.fromCollectionn()
,您可以在其中传递一个描述类型的参数。但是,像MapFunction<I, O>
这样的泛型函数可能需要额外的类型信息。
可以通过输入格式和函数来实现[ResultType Queryable](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java)接口,以明确地告诉API它们的返回类型。调用函数的 input types 通常可以由以前操作的结果类型推断。
Accumulators & Counters (累加器和计数器)
累加器是具有Add操作和最终累积结果的简单构造,在作业结束后可用。
最简单的累加器是计数器:您可以使用“累加器.添加(V值)”方法来增加它。在作业结束时,Flink将总结(合并)所有部分结果并将结果发送给客户端。在调试期间或如果您想快速了解更多关于您的数据,则累加器是有用的。
Flink目前有以下内置累加器。它们每个都实现了Accumulator接口。
Flink目前有以下内置累加器。它们每个都实现了Accumulator接口。 [*Histogram](https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java):--一个离散数量的回收箱的直方图实现。在内部,它只是一张从Integer到Integer的地图。您可以使用它来计算值的分布,例如单词计数程序的每行单词的分布。
如何使用累加器:
首先,您必须在用户定义的转换函数中创建要使用的累加器对象(此处为计数器)。
private IntCounter numLines = new IntCounter();
其次,必须注册累加器对象,通常在 rich 函数的open()
方法中注册。在这里,您还定义了名称。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在,您可以在操作符函数中的任何位置使用累加器,包括在open()
和lose()
方法中。
this.numLines.add(1);
总结果将存储在从执行环境的execute()
方法返回的JobExecutionResult
对象中(当前,如果执行等待作业完成,则此操作将仅适用)。
myJobExecutionResult.getAccumulatorResult("num-lines")
所有累加器每个作业共享一个名称空间。因此,您可以在作业的不同运算符函数中使用相同的累加器。Flink将内部合并所有同名的累加器。
关于累加器和迭代的注意事项:目前,累加器的结果只有在整个作业结束后才可用。我们还计划在下一次迭代中提供上一次迭代的结果。您可以使用Aggregators计算每次迭代的统计信息,并根据这些统计数据终止迭代。v
自定义累加器:
要实现自己的累加器,只需编写累加器接口的实现即可。如果您认为您的自定义累加器应该随Flink一起提供,请随意创建一个拉请求。
Accumulator<V,R>
是最灵活的:它为要增加的值定义了类型V
,为最终结果定义了结果类型R
。例如,对于直方图,V
是一个数字,R
是一个直方图。“简单累加器”用于两种类型相同的情况,例如计数器。