1. Source
Source 是你Flink程序的输入数据源。你可以用 env.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions。
还可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。
2. 内置Source
2.1 文本文件
- readTextFile(path) - 读取文本文件。
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。
实现: 在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。
注:如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
示例:
package com.nkong.blink.start;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author nkong
* @time 2022/1/26 17:40
*/
public class MyFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.readTextFile("/input/source.txt");
dataStream.print();
env.execute();
}
}
2.2 Socket
- socketTextStream - 从套接字读取。元素可以由分隔符分隔。
示例:
package com.nkong.blink.start;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author nkong
* @time 2022/1/26 17:40
*/
public class MyFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
dataStream.print();
env.execute();
}
}
2.3 集合
- fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
- fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
- fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
- fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
- generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。
示例:
package com.nkong.blink.start;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author nkong
* @time 2022/1/26 17:40
*/
public class MyFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("a", "b"));
dataStream.print();
env.execute();
}
}
3. 自定义Source
当Flink内置Source,以及第三方扩展Source组件无法满足需求时,可以通过实现SourceFunction接口的形式开发自定义Source。
示例:
package com.nkong.blink.start;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.UUID;
/**
* @author nkong
* @time 2022/1/26 17:40
*/
public class MyFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.addSource(new MySourceImpl());
dataStream.print();
env.execute();
}
public static class MySourceImpl implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < 10; i++) {
String outStr = UUID.randomUUID().toString();
ctx.collect(outStr);
}
}
@Override
public void cancel() {
}
}
}
4. 附带连接器
连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- FileSystem (sink)
- RabbitMQ (source/sink)
- Google PubSub (source/sink)
- Hybrid Source (source)
- Apache NiFi (source/sink)
- Apache Pulsar (source)
- Twitter Streaming API (source)
- JDBC (sink)
Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)