目录

  • 存在哪些连接器,以及在哪里可以找到它们
  • 如何使用这些连接器

    分布式连接器

  • 内置源Built-in sources

    • Collections, sockets, files and directories
  • 内置sinks
    • PrintSink, sockets, files
  • 消息队列(sources and sinks)
    • Apache Kafka, Amazon Kinesis, RabbitMQ, Apache NiFi, Google Cloud PubSub
  • 数据存储 (sinks)
    • FileSink, Elasticsearch, Cassandra, JDBC
  • 表的sources和sinks
    • 可与 DataStream API 互操作

这些连接件是作为Flink项目本身的一部分进行维护的。
在大多数情况下,内置的源和接收器是简单的连接器,更适合测试和原型制作,而不是生产使用。例如,上面在内置接收器下提到的“文件”指的是写入一个文件,而不是FileSink,它写入一系列滚动的文件。此外,内置的套接字和文件接收器不参与检查点,在失败情况下可能会丢失记录,但FileSink使用两阶段提交来提供只有一次的保证。
注意:上面不包括DataSet API使用的连接器。
在其他地方还有许多其他可用的连接器;下一节显示了一些。
flink-packages.org ,你会发现更多的连接器:

  • Apache Pulsar (source and sink)
  • Pravega (source and sink)
  • From Apache Bahir
    • Netty (source)
    • ActiveMq (source and sink)
    • Kudu (source and sink)
    • Akka (sink)
    • Flume (sink)
    • Redis (sink)
    • InfluxDB (sink)

除了这些连接器之外,您还可以在 github 上找到其他连接器。
如果你正在评估一个连接器,考虑一下:

  • 它支持检查点吗?
  • 它能提供什么样的处理保证? (至少一次,正好一次)
  • 差错处理?
  • 性能: 它是不是以某种方式批处理写?

使用Flink的连接器

基本数据源

  • Collections

    1. DataStream<String> names = env.fromElements("Ann", "Bob");
    2. DataStream<String> names = env.fromCollection(list);
  • Sockets

    1. DataStream<String> lines = env.socketTextStream("localhost", 9999);
  • Files

    1. DataStream<String> lines = env.readTextFile("file:///path");
    2. DataStream<String> lines = env.readFile(inputFormat, "file:///path");

    FromElement和FromCollection在测试中很有用
    套接字对于原型很有用,但不适合生产使用,因为Flink的检查点和恢复机制依赖于能否倒带和重播输入流。

基本数据接收器

  • Print to the standard output
    • stream.print()
  • Write as text file using toString()
    • stream.writeAsText(“/path/to/file”)
  • Write as CSV file
    • stream.writeAsCsv(“/path/to/file”)
  • Emit to socket
    • stream.writeToSocket(host, port, SerializationSchema)

关于 writeAsText 和 writeAsCsv 有两点需要注意:

  • 写入的文件只有在作业结束时才关闭,例如使用了所有输入
  • 当使用 parallelism > 1运行时,这些函数创建一个目录,并且每个worker写入该目录中自己的文件

一般情况下,使用 Filesink 而不是这些 writeAsXX 函数。

使用FileSource

  1. StreamExecutionEnvironment env =
  2. StreamExecutionEnvironment.getExecutionEnvironment();
  3. // monitor directory, periodically checking for new files
  4. FileSource.FileSourceBuilder<String> builder = FileSource
  5. .forRecordStreamFormat(new TextLineFormat(), new Path(...))
  6. .monitorContinuously(duration);
  7. env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
  8. ...

如果您修改文件(例如,通过附加到该文件),将重新处理其全部内容,从而创建重复项。

这只是一个例子。还有forBulkFileFormat用于读取parquet和orc格式。
批处理是默认值; monitorContinuously 将源设置为流模式。
流模式设计用于摄取滚动日志文件。一旦一个文件完成,原子式地将它移动到一个正在被监视的目录中。
Flink 状态是维护被监视的文件或目录的最后修改时间戳。无论何时发生更改,都会(再次)完整地读取所有新建/修改的文件。

连接器依赖项

  • 大多数连接器未与Flink捆绑在一起
  • 这有助于避免与代码的依赖项冲突
  • 要使用这些模块,您可以
    • 将JAR文件复制到每个TaskManager的lib文件夹中,或者
    • 将它们与您的代码打包(推荐)

我们之前看到的基本数据源和接收器—例如套接字和文件—是Flink核心的一部分,可以使用它们,而无需向项目添加任何额外的依赖项。但可能想要使用的每个其他连接器,如Kafka或FileSink,都需要向应用程序添加依赖项。这些JAR需要在集群中的每个任务管理器上都可用。

连接器:总结

  • Flink 包括几个源连接器和接收器连接器; 大多数需要额外的依赖项
  • 在 flink-packages.org、 Apache Bahir 和其他地方有更多的连接器
  • 许多连接器目前正在修订,以更好地支持批处理和流媒体
  • 您可以在您的 DataStream 应用程序中使用 flink SQL 连接器
  • 您可以实现自己的自定义连接器(这里不讨论)