译者:flink.sojb.cn

此连接器提供一个Sink,它将分区文件写入Flink FileSystem抽象支持的文件系统。由于在流式传输中输入可能是无限的,因此流式文件接收器将数据写入桶中。存储行为是可配置的,但有用的默认值是基于时间的存储,我们每小时开始编写一个新存储桶,从而获得各自包含无限输出流的一部分的单个文件。

在一个存储桶中,我们进一步根据滚动策略将输出拆分为较小的部分文件。这对于防止单个存储桶文件变得太大很有用。这也是可配置的,但默认策略根据文件大小和超时来滚动文件,即如果没有新数据写入零件文件。

StreamingFileSink同时支持逐行编码格式和本体-编码格式,如Apache的镶木

使用行编码输出格式

唯一需要的配置是我们想要输出数据的基本路径和 用于将记录序列化到每个文件的 编码器OutputStream

因此基本用法如下所示:

  1. import org.apache.flink.api.common.serialization.Encoder;
  2. import org.apache.flink.core.fs.Path;
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
  4. DataStream<String> input = ...;
  5. final StreamingFileSink<String> sink = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), (Encoder<String>) (element, stream) -> {
  7. PrintStream out = new PrintStream(stream);
  8. out.println(element.f1);
  9. })
  10. .build();
  11. input.addSink(sink);
  1. import org.apache.flink.api.common.serialization.Encoder
  2. import org.apache.flink.core.fs.Path
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
  4. val input: DataStream[String] = ...
  5. final StreamingFileSink[String] sink = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), (element, stream) => {
  7. val out = new PrintStream(stream)
  8. out.println(element.f1)
  9. })
  10. .build()
  11. input.addSink(sink)

这将创建一个流式接收器,用于创建每小时存储桶并使用默认滚动策略。默认存储分配器是 DateTimeBucketAssigner ,默认滚动策略是 DefaultRollingPolicy。您可以 在接收器构建器上指定自定义 BucketAssignerRollingPolicy。请查看JavaDoc for StreamingFileSink 以获取更多配置选项以及有关存储分配器和滚动策略的工作和交互的更多文档。

使用批量编码的输出格式

在上面的例子中,我们使用了一个Encoder可以单独编码或序列化每个记录的。流式文件接收器还支持批量编码的输出格式,如Apache Parquet。要使用这些,StreamingFileSink.forRowFormat()请使用 StreamingFileSink.forBulkFormat()并指定BulkWriter.Factory

ParquetAvroWriters 具有用于创建BulkWriter.Factory各种类型的静态方法。

重要信息:批量编码格式只能与OnCheckpointRollingPolicy结合使用,后者会在每个检查点上滚动正在进行的部分文件。