1 DataStream编程
1.1 输入流
源是程序从中读取输入的位置,可以使用以下方法将源附加到您的程序:
StreamExecutionEnvironment.addSource(sourceFunction) 。
Flink附带了许多预先实现的源函数,但您可以通过实现 SourceFunction 非并行源,或通过实现
ParallelSourceFunction 接口或扩展 RichParallelSourceFunction for parallel源来编写自己的自
定义源。
有几个预定义的流源可从以下位置访问 StreamExecutionEnvironment :
基于文件:
readTextFile(path) - TextInputFormat 逐行读取文本文件,即符合规范的文件,并将它们作
为字符串返回。
readFile(fileInputFormat, path) - 按指定的文件输入格式指定读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这
是前两个内部调用的方法。它 path 根据给定的内容读取文件 fileInputFormat 。根据提供的内
容 watchType ,此源可以定期监视(每 interval ms)新数据
( FileProcessingMode.PROCESS_CONTINUOUSLY )的路径,或者处理当前在路径中的数据并退
出( FileProcessingMode.PROCESS_ONCE )。使用 pathFilter ,用户可以进一步排除正在处
理的文件。
实现:
Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独
的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由并行运行的多个任务执行。后
者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于
watchType ),找到要处理的文件,将它们分成分割,并将这些拆分分配给下游读者。读者是那
些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。
重要笔记:
1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则在修改文件
时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾附加数据将导
致其所有内容被重新处理。
2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则源扫描路径一次并退出,
而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之
后关闭源将导致不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从上一
个检查点恢复读取。
基于Socket:
- socketTextStream - 从Socket中读取,元素可以用分隔符分隔。
基于集合:
fromCollection(Collection) - 从Java Java.util.Collection创建数据流。集合中的所有元素必须
属于同一类型。
fromCollection(Iterator, Class) - 从迭代器创建数据流。该类指定迭代器返回的元素的数
据类型。<br />fromElements(T ...) - 从给定的对象序列创建数据流。所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据流。该类
指定迭代器返回的元素的数据类型。
generateSequence(from, to) - 并行生成给定间隔中的数字序列。
自定义:
addSource - 附加新的源功能。例如,要从Apache Kafka读取,可以使用 addSource(new
FlinkKafkaConsumer08<>(...)) 。
1.2 数据流转换
此时再将中间的转换算子
Transformation
,即通过从一个或多个 DataStream 生成新的 DataStream 的过程被称为 Transformation 操作。在转换过程中,每种操作类型被定义为不同的 Operator,Flink 程序能够将多个 Transformation 组成一个 DataFlow 的拓扑。
1. Map
DataStream → DataStream
调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数
据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
如以下示例:它将输入流的元素数值增加一倍:DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
2. FlatMap
DataStream → DataStream
主要对输入的元素处理之后生成一个或者多个元素,如下示例:将句子拆分成单词:dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
3. Filter
DataStream → DataStream
该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过
滤掉。
如下所示:返回不为0的数据dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
4. KeyBy
DataStream → KeyedStream
该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集
中执行Partition操作,将相同的key值的数据放置在相同的分区中。简单来说,就是sql里面的
group bydataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意 如果出现以下情况,则类型不能成为Key:
1. 它是POJO类型,但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
2. 它是任何类型的数组。5. Reduce
KeyedStream → DataStream
该算子和MapReduce的Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用
户自定义的ReduceFunction滚动的进行数据聚合处理,其中定义的ReduceFunction必须满足运算
结合律和交换律:keyedStream.reduce(new ReduceFunction<Integer>() {
@Override public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
6. Fold
KeyedStream → DataStream
具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。
折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
7. Aggregations
KeyedStream → DataStream
Aggregations 是 KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操 作,滚动地产生一系列数据聚合结果。其实是将 Reduce 算子中的函数进行了封装
,封装的 聚合操作有 sum、min、minBy、max、maxBy等,这样就不需要用户自己定义 Reduce 函数。
滚动聚合数据流上的聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具
有最小值的元素(max和maxBy相同)。keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
8. Window
KeyedStream → WindowedStream
可以在已经分区的KeyedStream上定义时间窗口。
时间窗口根据某些特征(例如,在最后5秒内到达的数据)对每个Key中的数据进行分组。// 最后5秒的数据
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
9. WindowAll
DataStream → AllWindowedStream
Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数
据)对所有流事件进行分组。
警告:在许多情况下,这是非并行转换。所有记录将收集在windowAll运算符的一个任务中。// 最后5秒的数据
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
10. Window Apply
WindowedStream → DataStream AllWindowedStream → DataStream
将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。
注意:如果正在使用windowAll转换,则需要使用AllWindowFunction。windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer,
Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) { sum += t.f1; }out.collect (new Integer(sum)); } });
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
11. Window Reduce
WindowedStream → DataStream
将减少功能应用于窗口并返回减少的值。windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
12. Window Fold
WindowedStream → DataStream
将折叠功能应用于窗口并返回折叠值。
示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
13. Windows上的聚合
WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值
的元素(max和maxBy相同)。windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
14. Union
DataStream → DataStream
将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集
的格式和输入的数据集格式保持一致
注意:如果将数据流与其自身联合,则会在结果流中获取两次元素。dataStream.union(otherStream1, otherStream2, ...);
15. Window Join
DataStream,DataStream → DataStream
根据主键和公共时间窗口,连接数据流dataStream.join(otherStream)
.where(<keyselector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
16. Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内使用公共Key连接两个键控流的两个元素e1和e2,以便e1.timestamp +
lowerBound <= e2.timestamp <= e1.timestamp + upperBound// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
17. Window CoGroup
DataStream,DataStream → DataStream
在给定Key和公共时间窗口上对两个数据流进行coGroup操作。dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
18. Connect
DataStream,DataStream → ConnectedStreams
Connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来的数据集的
数据类型。连接操作允许共享状态数据,也就是说在多个数据集之间可以操作和查看对方数据集的
状态。
注意:Union 之前两个流的类型
必须是一样
,Connect可以不一样
,在之后的 coMap 中再去调 整成为一样的。- Connect
只能
操作两个流,Union可以
操作多个。 ```java DataStreamsomeStream = //… DataStream otherStream = //…
ConnectedStreams
<a name="xy5VJ"></a>
### 19. CoMap,CoFlatMap
ConnectedStreams → DataStream<br />类似于连接数据流上的map和flflatMap。**
```java
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
20. Split
DataStream → SplitStream
Split 算子是将一个 DataStream 数据集按照条件进行拆分
,形成两个数据集的过程, 也是 union 算子的逆向实现。每个接入的数据都会被路由
到一个或者多个输出数据集中。
SplitStream<Integer> split = someDataStream.split(new
OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
21. Select
SplitStream → DataStream
从拆分流中选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
22. Iterate
DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更
新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的元素将被发送回反馈通
道,其余元素将向下游转发。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
23. 提取时间戳
DataStream → DataStream
从记录中提取时间戳,以便使用事件时间语义的窗口。
stream.assignTimestamps (new TimeStampExtractor() {...});
24. 元组数据流转换 Project
DataStream→DataStream
从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
1.3 输出流
数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置
输出格式,这些格式封装在DataStreams上的操作后面:
- writeAsText() / TextOutputFormat - 按字符串顺序写入元素。通过调用每个元素的
toString()方法获得字符串。
writeAsCsv(…) / CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置
的。每个字段的值来自对象的_toString_()方法。
print() / printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,
可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输<br /> 出也将以生成输出的任务的标识符为前缀。
writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义
对象到字节的转换。
writeToSocket - 根据a将元素写入套接字 SerializationSchema
addSink - 调用自定义接收器功能。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系
统实现为接收器功能。<br />请注意, write*()方法 DataStream 主要用于调试目的。他们没有参与Flink的检查点,这意味着这些<br />函数通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所<br />有发送到OutputFormat的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会<br />丢失。<br />为了可靠,准确地将流传送到文件系统,请使用 flink-connector-filesystem 。此外,通过<br />该 .addSink(...) 方法的自定义实现可以参与Flink的精确一次语义检查点。