一、前言

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source

StreamExecutionEnvironment 中可以使用以下几个已实现的 stream sources,总的来说可以分为以下几个大类:

二、基于集合

  • fromCollection(Collection):从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。
  • fromCollection(Iterator, Class):从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
  • fromElements(T …):从给定的对象序列中创建数据流。所有对象类型必须相同。 ```java package com.wells.flink.demo.sources;

import com.wells.flink.demo.entity.Person; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

  • Description
  • Created by wells on 2020-05-06 20:02:11 */

public class FromElementsSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  1. DataStreamSource<Person> personDataStreamSource = env.fromElements(
  2. new Person("tom", 23, "man"),
  3. new Person("jerry", 2, "woman")
  4. );
  5. personDataStreamSource.print();
  6. env.execute();
  7. }

}

  1. - fromParallelCollection(SplittableIterator, Class):从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。
  2. - generateSequence(from, to):创建一个生成指定区间范围内的数字序列的并行数据流。
  3. <a name="4F95C"></a>
  4. # 三、基于文件
  5. - readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。
  6. - readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。
  7. - readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
  8. ```java
  9. package com.wells.flink.demo.sources;
  10. import org.apache.flink.api.java.io.TextInputFormat;
  11. import org.apache.flink.core.fs.Path;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
  15. /**
  16. * Description 可以检测文件内容变化和文件新增
  17. * Created by wells on 2020-05-06 20:14:42
  18. */
  19. public class FileSourceTest {
  20. public static void main(String[] args) throws Exception {
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. String filePath = "/Users/wells/Temp/data/wordcount";
  23. DataStreamSource<String> stringDataStreamSource = env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);
  24. stringDataStreamSource.print();
  25. env.execute();
  26. }
  27. }

实现:
在具体实现上,Flink 把文件读取过程分为两个子任务,即目录监控和数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据 watchType 定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游 reader。reader 负责读取数据。每个分片只能由一个 reader 读取,但一个 reader 可以逐个读取多个切分片。

重要注意:
如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,则 source 仅扫描路径一次然后退出,而不等待 reader 完成文件内容的读取。当然 reader 会继续阅读,直到读取所有的文件内容。关闭 source 后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。

四、基于Socket

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

  1. package com.wells.flink.demo.sources;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. /**
  9. * Description
  10. * Created by wells on 2020-05-06 20:25:10
  11. */
  12. public class SocketSourceTest {
  13. public static void main(String[] args) throws Exception {
  14. String host = "127.0.0.1";
  15. int port = 8090;
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. DataStreamSource<String> source = env.socketTextStream(host, port);
  18. SingleOutputStreamOperator<Tuple2<String, Integer>> counts = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  19. @Override
  20. public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
  21. String[] words = line.split(" ");
  22. for (String word : words) {
  23. collector.collect(new Tuple2<String, Integer>(word, 1));
  24. }
  25. }
  26. }).keyBy(0).sum(1);
  27. counts.print();
  28. env.execute();
  29. }
  30. }

五、基于Kafka

5.1、部署kafka,略
image.png

5.2、flink 引入pom

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  4. <java.version>1.8</java.version>
  5. <flink.version>1.10.0</flink.version>
  6. <scala.version>2.11</scala.version>
  7. </properties>
  8. <!-- flink kafka connector -->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
  12. <version>${flink.version}</version>
  13. </dependency>

5.3、flink main

  1. package com.wells.flink.demo.sources;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  6. import java.util.Properties;
  7. /**
  8. * Description Kafka API
  9. * Created by wells on 2020-03-25 20:36:53
  10. */
  11. public class KafkaSourceTest {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. Properties properties = new Properties();
  15. properties.setProperty("bootstrap.servers", "localhost:9092");
  16. properties.setProperty("zookeeper.connect", "localhost:2181");
  17. properties.setProperty("group.id", "flink-demo-topic-group");
  18. DataStream<String> stream = env
  19. .addSource(new FlinkKafkaConsumer011<String>("flink-demo-topic", new SimpleStringSchema(), properties));
  20. stream.print();
  21. env.execute();
  22. }
  23. }

5.4、测试结果
kafka producer send message
image.png

flink output
image.png

注意:
其中输出结果前面的 11> 数字代表线程编号,由并行分区决定,可以通过 dstream.print().setParallelism(1) 来避免

六、自定义Source

如果你想自己自定义自己的 Source 呢?
那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口,它继承自一个标记接口(空接口)Function。
SourceFunction 定义了两个接口方法:
image.png

  • run : 启动一个 source,即对接一个外部数据源然后 emit 元素形成 stream(大部分情况下会通过在该方法里运行一个 while 循环的形式来产生 stream)。
  • cancel : 取消一个 source,也即将 run 中的循环 emit 元素的行为终止。

正常情况下,一个 SourceFunction 实现这两个接口方法就可以了。其实这两个接口方法也固定了一种实现模板。Flink源码中的例子如下:
image.png

6.2、基于mysql自定义Source

参考:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/

七、Source对比

说下上面几种的特点吧:

  • 基于集合:有界数据集,更偏向于本地测试用
  • 基于文件:适合监听文件修改并读取其内容
  • 基于 Socket:监听主机的 host port,从 Socket 中获取数据
  • 基于Kafka:一般与现网对接较多
  • 自定义 addSource:大多数的场景数据都是无界的,会源源不断的过来。比如去消费 Kafka 某个 topic 上的数据,这时候就需要用到这个 addSource,可能因为用的比较多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等类可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类

Flink目前支持的Source如下:
image.png