1. Source

Source 是你Flink程序的输入数据源。你可以用 env.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions。
还可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

2. 内置Source

2.1 文本文件

  • readTextFile(path) - 读取文本文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

    实现: 在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

注:如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
示例:

  1. package com.nkong.blink.start;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. /**
  5. * @author nkong
  6. * @time 2022/1/26 17:40
  7. */
  8. public class MyFlinkJob {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. DataStream<String> dataStream = env.readTextFile("/input/source.txt");
  12. dataStream.print();
  13. env.execute();
  14. }
  15. }

2.2 Socket

  • socketTextStream - 从套接字读取。元素可以由分隔符分隔。

示例:

  1. package com.nkong.blink.start;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. /**
  5. * @author nkong
  6. * @time 2022/1/26 17:40
  7. */
  8. public class MyFlinkJob {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
  12. dataStream.print();
  13. env.execute();
  14. }
  15. }

2.3 集合

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
  • fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

示例:

  1. package com.nkong.blink.start;
  2. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /**
  6. * @author nkong
  7. * @time 2022/1/26 17:40
  8. */
  9. public class MyFlinkJob {
  10. public static void main(String[] args) throws Exception {
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("a", "b"));
  13. dataStream.print();
  14. env.execute();
  15. }
  16. }

3. 自定义Source

当Flink内置Source,以及第三方扩展Source组件无法满足需求时,可以通过实现SourceFunction接口的形式开发自定义Source。
示例:

  1. package com.nkong.blink.start;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  5. import java.util.UUID;
  6. /**
  7. * @author nkong
  8. * @time 2022/1/26 17:40
  9. */
  10. public class MyFlinkJob {
  11. public static void main(String[] args) throws Exception {
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. DataStream<String> dataStream = env.addSource(new MySourceImpl());
  14. dataStream.print();
  15. env.execute();
  16. }
  17. public static class MySourceImpl implements SourceFunction<String> {
  18. @Override
  19. public void run(SourceContext<String> ctx) throws Exception {
  20. for (int i = 0; i < 10; i++) {
  21. String outStr = UUID.randomUUID().toString();
  22. ctx.collect(outStr);
  23. }
  24. }
  25. @Override
  26. public void cancel() {
  27. }
  28. }
  29. }

4. 附带连接器

连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括: