目录
- 存在哪些连接器,以及在哪里可以找到它们
-
分布式连接器
内置源Built-in sources
- Collections, sockets, files and directories
- 内置sinks
- PrintSink, sockets, files
- 消息队列(sources and sinks)
- Apache Kafka, Amazon Kinesis, RabbitMQ, Apache NiFi, Google Cloud PubSub
- 数据存储 (sinks)
- FileSink, Elasticsearch, Cassandra, JDBC
- 表的sources和sinks
- 可与 DataStream API 互操作
这些连接件是作为Flink项目本身的一部分进行维护的。
在大多数情况下,内置的源和接收器是简单的连接器,更适合测试和原型制作,而不是生产使用。例如,上面在内置接收器下提到的“文件”指的是写入一个文件,而不是FileSink,它写入一系列滚动的文件。此外,内置的套接字和文件接收器不参与检查点,在失败情况下可能会丢失记录,但FileSink使用两阶段提交来提供只有一次的保证。
注意:上面不包括DataSet API使用的连接器。
在其他地方还有许多其他可用的连接器;下一节显示了一些。
在 flink-packages.org ,你会发现更多的连接器:
- Apache Pulsar (source and sink)
- Pravega (source and sink)
- From Apache Bahir
- Netty (source)
- ActiveMq (source and sink)
- Kudu (source and sink)
- Akka (sink)
- Flume (sink)
- Redis (sink)
- InfluxDB (sink)
除了这些连接器之外,您还可以在 github 上找到其他连接器。
如果你正在评估一个连接器,考虑一下:
- 它支持检查点吗?
- 它能提供什么样的处理保证? (至少一次,正好一次)
- 差错处理?
- 性能: 它是不是以某种方式批处理写?
使用Flink的连接器
基本数据源
Collections
DataStream<String> names = env.fromElements("Ann", "Bob");
DataStream<String> names = env.fromCollection(list);
Sockets
DataStream<String> lines = env.socketTextStream("localhost", 9999);
Files
DataStream<String> lines = env.readTextFile("file:///path");
DataStream<String> lines = env.readFile(inputFormat, "file:///path");
FromElement和FromCollection在测试中很有用
套接字对于原型很有用,但不适合生产使用,因为Flink的检查点和恢复机制依赖于能否倒带和重播输入流。
基本数据接收器
- Print to the standard output
- stream.print()
- Write as text file using toString()
- stream.writeAsText(“/path/to/file”)
- Write as CSV file
- stream.writeAsCsv(“/path/to/file”)
- Emit to socket
- stream.writeToSocket(host, port, SerializationSchema)
关于 writeAsText 和 writeAsCsv 有两点需要注意:
- 写入的文件只有在作业结束时才关闭,例如使用了所有输入
- 当使用 parallelism > 1运行时,这些函数创建一个目录,并且每个worker写入该目录中自己的文件
一般情况下,使用 Filesink 而不是这些 writeAsXX 函数。
使用FileSource
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, periodically checking for new files
FileSource.FileSourceBuilder<String> builder = FileSource
.forRecordStreamFormat(new TextLineFormat(), new Path(...))
.monitorContinuously(duration);
env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
...
如果您修改文件(例如,通过附加到该文件),将重新处理其全部内容,从而创建重复项。
这只是一个例子。还有forBulkFileFormat用于读取parquet和orc格式。
批处理是默认值; monitorContinuously 将源设置为流模式。
流模式设计用于摄取滚动日志文件。一旦一个文件完成,原子式地将它移动到一个正在被监视的目录中。
Flink 状态是维护被监视的文件或目录的最后修改时间戳。无论何时发生更改,都会(再次)完整地读取所有新建/修改的文件。
连接器依赖项
- 大多数连接器未与Flink捆绑在一起
- 这有助于避免与代码的依赖项冲突
- 要使用这些模块,您可以
- 将JAR文件复制到每个TaskManager的lib文件夹中,或者
- 将它们与您的代码打包(推荐)
我们之前看到的基本数据源和接收器—例如套接字和文件—是Flink核心的一部分,可以使用它们,而无需向项目添加任何额外的依赖项。但可能想要使用的每个其他连接器,如Kafka或FileSink,都需要向应用程序添加依赖项。这些JAR需要在集群中的每个任务管理器上都可用。
连接器:总结
- Flink 包括几个源连接器和接收器连接器; 大多数需要额外的依赖项
- 在 flink-packages.org、 Apache Bahir 和其他地方有更多的连接器
- 许多连接器目前正在修订,以更好地支持批处理和流媒体
- 您可以在您的 DataStream 应用程序中使用 flink SQL 连接器
- 您可以实现自己的自定义连接器(这里不讨论)