1 DataStream编程

1.1 输入流

源是程序从中读取输入的位置,可以使用以下方法将源附加到您的程序:
StreamExecutionEnvironment.addSource(sourceFunction) 。
Flink附带了许多预先实现的源函数,但您可以通过实现 SourceFunction 非并行源,或通过实现
ParallelSourceFunction 接口或扩展 RichParallelSourceFunction for parallel源来编写自己的自
定义源。
有几个预定义的流源可从以下位置访问 StreamExecutionEnvironment :
基于文件:
readTextFile(path) - TextInputFormat 逐行读取文本文件,即符合规范的文件,并将它们作
为字符串返回。
readFile(fileInputFormat, path) - 按指定的文件输入格式指定读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这
是前两个内部调用的方法。它 path 根据给定的内容读取文件 fileInputFormat 。根据提供的内
容 watchType ,此源可以定期监视(每 interval ms)新数据
( FileProcessingMode.PROCESS_CONTINUOUSLY )的路径,或者处理当前在路径中的数据并退
出( FileProcessingMode.PROCESS_ONCE )。使用 pathFilter ,用户可以进一步排除正在处
理的文件。
实现
Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独
的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由并行运行的多个任务执行。后
者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于
watchType ),找到要处理的文件,将它们分成分割,并将这些拆分分配给下游读者。读者是那
些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。
重要笔记:
1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则在修改文件
时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾附加数据将导
致其所有内容被重新处理。
2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则源扫描路径一次并退出,
而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之
后关闭源将导致不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从上一
个检查点恢复读取。
基于Socket:

  • socketTextStream - 从Socket中读取,元素可以用分隔符分隔。

基于集合:

  • fromCollection(Collection) - 从Java Java.util.Collection创建数据流。集合中的所有元素必须

    1. 属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。该类指定迭代器返回的元素的数

    1. 据类型。<br />fromElements(T ...) - 从给定的对象序列创建数据流。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据流。该类

    1. 指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

自定义:

  • addSource - 附加新的源功能。例如,要从Apache Kafka读取,可以使用 addSource(new

    1. FlinkKafkaConsumer08<>(...))

    1.2 数据流转换

    此时再将中间的转换算子Transformation,即通过从一个或多个 DataStream 生成新的 DataStream 的过程被称为 Transformation 操作。在转换过程中,每种操作类型被定义为不同的 Operator,Flink 程序能够将多个 Transformation 组成一个 DataFlow 的拓扑。

    1. Map

    DataStream → DataStream
    调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中
    据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
    如以下示例:它将输入流的元素数值增加一倍:

    1. DataStream<Integer> dataStream = //...
    2. dataStream.map(new MapFunction<Integer, Integer>() {
    3. @Override
    4. public Integer map(Integer value) throws Exception {
    5. return 2 * value;
    6. }
    7. });

    2. FlatMap

    DataStream → DataStream
    主要对输入的元素处理之后生成一个或者多个元素,如下示例:将句子拆分成单词:

    1. dataStream.flatMap(new FlatMapFunction<String, String>() {
    2. @Override
    3. public void flatMap(String value, Collector<String> out) throws Exception {
    4. for(String word: value.split(" ")){
    5. out.collect(word);
    6. }
    7. }
    8. });

    3. Filter

    DataStream → DataStream
    该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过
    滤掉。
    如下所示:返回不为0的数据

    1. dataStream.filter(new FilterFunction<Integer>() {
    2. @Override
    3. public boolean filter(Integer value) throws Exception {
    4. return value != 0;
    5. }
    6. });

    4. KeyBy

    DataStream → KeyedStream
    该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集
    中执行Partition操作,将相同的key值的数据放置在相同的分区中。简单来说,就是sql里面的
    group by

    1. dataStream.keyBy("someKey") // Key by field "someKey"
    2. dataStream.keyBy(0) // Key by the first element of a Tuple

    注意 如果出现以下情况,则类型不能成为Key
    1. 它是POJO类型,但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
    2. 它是任何类型的数组。

    5. Reduce

    KeyedStream → DataStream
    该算子和MapReduce的Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用
    户自定义的ReduceFunction滚动的进行数据聚合处理,其中定义的ReduceFunction必须满足运算
    结合律和交换律:

    1. keyedStream.reduce(new ReduceFunction<Integer>() {
    2. @Override public Integer reduce(Integer value1, Integer value2) throws Exception {
    3. return value1 + value2;
    4. }
    5. });

    6. Fold

    KeyedStream → DataStream
    具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。
    折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..

    1. DataStream<String> result =
    2. keyedStream.fold("start", new FoldFunction<Integer, String>() {
    3. @Override
    4. public String fold(String current, Integer value) {
    5. return current + "-" + value;
    6. }
    7. });

    7. Aggregations

    KeyedStream → DataStream
    Aggregations 是 KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操 作,滚动地产生一系列数据聚合结果。其实是将 Reduce 算子中的函数进行了封装,封装的 聚合操作有 sum、min、minBy、max、maxBy等,这样就不需要用户自己定义 Reduce 函数。
    滚动聚合数据流上的聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具
    有最小值的元素(max和maxBy相同)。

    1. keyedStream.sum(0);
    2. keyedStream.sum("key");
    3. keyedStream.min(0);
    4. keyedStream.min("key");
    5. keyedStream.max(0);
    6. keyedStream.max("key");
    7. keyedStream.minBy(0);
    8. keyedStream.minBy("key");
    9. keyedStream.maxBy(0);
    10. keyedStream.maxBy("key");

    8. Window

    KeyedStream → WindowedStream
    可以在已经分区的KeyedStream上定义时间窗口。
    时间窗口根据某些特征(例如,在最后5秒内到达的数据)对每个Key中的数据进行分组。

    1. // 最后5秒的数据
    2. dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

    9. WindowAll

    DataStream → AllWindowedStream
    Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数
    据)对所有流事件进行分组。
    警告:在许多情况下,这是非并行转换。所有记录将收集在windowAll运算符的一个任务中。

    1. // 最后5秒的数据
    2. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

    10. Window Apply

    WindowedStream → DataStream AllWindowedStream → DataStream
    将一般功能应用于整个窗口。下面是一个手动求和窗口元素的函数。
    注意:如果正在使用windowAll转换,则需要使用AllWindowFunction。

    1. windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer,
    2. Tuple, Window>() {
    3. public void apply (Tuple tuple,
    4. Window window,
    5. Iterable<Tuple2<String, Integer>> values,
    6. Collector<Integer> out) throws Exception {
    7. int sum = 0;
    8. for (value t: values) { sum += t.f1; }out.collect (new Integer(sum)); } });
    9. // applying an AllWindowFunction on non-keyed window stream
    10. allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
    11. Integer, Window>() {
    12. public void apply (Window window,
    13. Iterable<Tuple2<String, Integer>> values,
    14. Collector<Integer> out) throws Exception {
    15. int sum = 0;
    16. for (value t: values) {
    17. sum += t.f1;
    18. }
    19. out.collect (new Integer(sum));
    20. }
    21. });

    11. Window Reduce

    WindowedStream → DataStream
    将减少功能应用于窗口并返回减少的值。

    1. windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    2. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
    3. Tuple2<String, Integer> value2) throws Exception {
    4. return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    5. }
    6. });

    12. Window Fold

    WindowedStream → DataStream
    将折叠功能应用于窗口并返回折叠值。
    示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:

    1. windowedStream.fold("start", new FoldFunction<Integer, String>() {
    2. public String fold(String current, Integer value) {
    3. return current + "-" + value;
    4. }
    5. });

    13. Windows上的聚合

    WindowedStream → DataStream
    聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值
    的元素(max和maxBy相同)。

    1. windowedStream.sum(0);
    2. windowedStream.sum("key");
    3. windowedStream.min(0);
    4. windowedStream.min("key");
    5. windowedStream.max(0);
    6. windowedStream.max("key");
    7. windowedStream.minBy(0);
    8. windowedStream.minBy("key");
    9. windowedStream.maxBy(0);
    10. windowedStream.maxBy("key");

    14. Union

    DataStream → DataStream
    将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集
    的格式和输入的数据集格式保持一致
    注意:如果将数据流与其自身联合,则会在结果流中获取两次元素。

    1. dataStream.union(otherStream1, otherStream2, ...);

    15. Window Join

    DataStream,DataStream → DataStream
    根据主键和公共时间窗口,连接数据流

    1. dataStream.join(otherStream)
    2. .where(<keyselector>).equalTo(<key selector>)
    3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    4. .apply (new JoinFunction () {...});

    16. Interval Join

    KeyedStream,KeyedStream → DataStream
    在给定的时间间隔内使用公共Key连接两个键控流的两个元素e1和e2,以便e1.timestamp +
    lowerBound <= e2.timestamp <= e1.timestamp + upperBound

    1. // this will join the two streams so that
    2. // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
    3. keyedStream.intervalJoin(otherKeyedStream)
    4. .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    5. .upperBoundExclusive(true) // optional
    6. .lowerBoundExclusive(true) // optional
    7. .process(new IntervalJoinFunction() {...});

    17. Window CoGroup

    DataStream,DataStream → DataStream
    在给定Key和公共时间窗口上对两个数据流进行coGroup操作。

    1. dataStream.coGroup(otherStream)
    2. .where(0).equalTo(1)
    3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    4. .apply (new CoGroupFunction () {...});

    18. Connect

    DataStream,DataStream → ConnectedStreams
    Connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来的数据集的
    数据类型。连接操作允许共享状态数据,也就是说在多个数据集之间可以操作和查看对方数据集的
    状态。
    注意:

  • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调 整成为一样的。

  • Connect 只能操作两个流,Union 可以操作多个。 ```java DataStream someStream = //… DataStream otherStream = //…

ConnectedStreams connectedStreams = someStream.connect(otherStream);

  1. <a name="xy5VJ"></a>
  2. ### 19. CoMap,CoFlatMap
  3. ConnectedStreams → DataStream<br />类似于连接数据流上的map和flflatMap。**
  4. ```java
  5. connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
  6. @Override
  7. public Boolean map1(Integer value) {
  8. return true;
  9. }
  10. @Override public Boolean map2(String value) {
  11. return false;
  12. }
  13. });
  14. connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
  15. @Override
  16. public void flatMap1(Integer value, Collector<String> out) {
  17. out.collect(value.toString());
  18. }
  19. @Override
  20. public void flatMap2(String value, Collector<String> out) {
  21. for (String word: value.split(" ")) {
  22. out.collect(word);
  23. }
  24. }
  25. });

20. Split

DataStream → SplitStream
Split 算子是将一个 DataStream 数据集按照条件进行拆分,形成两个数据集的过程, 也是 union 算子的逆向实现。每个接入的数据都会被路由到一个或者多个输出数据集中。

  1. SplitStream<Integer> split = someDataStream.split(new
  2. OutputSelector<Integer>() {
  3. @Override
  4. public Iterable<String> select(Integer value) {
  5. List<String> output = new ArrayList<String>();
  6. if (value % 2 == 0) {
  7. output.add("even");
  8. }
  9. else {
  10. output.add("odd");
  11. }
  12. return output;
  13. }
  14. });

21. Select

SplitStream → DataStream
从拆分流中选择一个或多个流。

  1. SplitStream<Integer> split;
  2. DataStream<Integer> even = split.select("even");
  3. DataStream<Integer> odd = split.select("odd");
  4. DataStream<Integer> all = split.select("even","odd");

22. Iterate

DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更
新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的元素将被发送回反馈通
道,其余元素将向下游转发。

  1. IterativeStream<Long> iteration = initialStream.iterate();
  2. DataStream<Long> iterationBody = iteration.map (/*do something*/);
  3. DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
  4. @Override
  5. public boolean filter(Long value) throws Exception {
  6. return value > 0;
  7. }
  8. });
  9. iteration.closeWith(feedback);
  10. DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
  11. @Override public boolean filter(Long value) throws Exception {
  12. return value <= 0;
  13. }
  14. });

23. 提取时间戳

DataStream → DataStream
从记录中提取时间戳,以便使用事件时间语义的窗口。

  1. stream.assignTimestamps (new TimeStampExtractor() {...});

24. 元组数据流转换 Project

DataStream→DataStream
从元组中选择字段的子集

  1. DataStream<Tuple3<Integer, Double, String>> in = // [...]
  2. DataStream<Tuple2<String, Integer>> out = in.project(2,0);

1.3 输出流

数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置
输出格式,这些格式封装在DataStreams上的操作后面:

  • writeAsText() / TextOutputFormat - 按字符串顺序写入元素。通过调用每个元素的

toString()方法获得字符串。

  • writeAsCsv(…) / CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置

    1. 的。每个字段的值来自对象的_toString_()方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,

    1. 可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输<br /> 出也将以生成输出的任务的标识符为前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义

    1. 对象到字节的转换。
  • writeToSocket - 根据a将元素写入套接字 SerializationSchema

  • addSink - 调用自定义接收器功能。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系

    1. 统实现为接收器功能。<br />请注意, write*()方法 DataStream 主要用于调试目的。他们没有参与Flink的检查点,这意味着这些<br />函数通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所<br />有发送到OutputFormat的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会<br />丢失。<br />为了可靠,准确地将流传送到文件系统,请使用 flink-connector-filesystem 。此外,通过<br />该 .addSink(...) 方法的自定义实现可以参与Flink的精确一次语义检查点。