Streaming File Sink

This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction. 该连接器提供了一个Sink,可将分区文件写入Flink FileSystem抽象支持的文件系统。

Important Note: For S3, the StreamingFileSink supports only the Hadoop-based FileSystem implementation, not the implementation based on Presto. In case your job uses the StreamingFileSink to write to S3 but you want to use the Presto-based one for checkpointing, it is advised to use explicitly “s3a://” (for Hadoop) as the scheme for the target path of the sink and “s3p://” for checkpointing (for Presto). Using “s3://” for both the sink and checkpointing may lead to unpredictable behavior, as both implementations “listen” to that scheme. 重要说明:对于S3,StreamingFileSink仅支持基于Hadoop的 FileSystem实现,不支持基于Presto的实现。如果您的作业使用StreamingFileSink写入S3,但是您想使用基于Presto的S3进行检查点,建议显式使用“ s3a://”(对于Hadoop)作为接收器和接收器的目标路径的方案。“ s3p://”用于检查点(对于Presto)。对接收器和检查点使用“ s3://”可能会导致不可预知的行为,因为两种实现都“监听”该方案。

Since in streaming the input is potentially infinite, the streaming file sink writes data into buckets. The bucketing behaviour is configurable but a useful default is time-based bucketing where we start writing a new bucket every hour and thus get individual files that each contain a part of the infinite output stream. 由于在流中输入可能是无限的,因此流文件接收器会将数据写入存储桶中。存储桶的行为是可配置的,但有用的默认设置是基于时间的存储桶,其中我们每小时开始写入一个新的存储桶,从而获得每个文件,每个文件都包含无限输出流的一部分。

Within a bucket, we further split the output into smaller part files based on a rolling policy. This is useful to prevent individual bucket files from getting too big. This is also configurable but the default policy rolls files based on file size and a timeout, i.e if no new data was written to a part file. 在存储桶中,我们根据滚动策略将输出进一步分成较小的零件文件。这对于防止单个存储桶文件变得太大很有用。这也是可配置的,但是默认策略根据文件大小和超时(即,如果没有新数据写入零件文件)来滚动文件。

The StreamingFileSink supports both row-wise encoding formats and bulk-encoding formats, such as Apache Parquet. 该StreamingFileSink同时支持逐行编码格式和批量编码格式,如Apache的镶木。

Using Row-encoded Output Formats 使用行编码的输出格式

The only required configuration are the base path where we want to output our data and an Encoder that is used for serializing records to the OutputStream for each file. 唯一需要的配置是我们要输出数据的基本路径,以及一个用于将记录序列化为每个文件的编码器OutputStream。

Basic usage thus looks like this: 因此,基本用法如下所示:

  1. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  2. import org.apache.flink.core.fs.Path;
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
  4. DataStream<String> input = ...;
  5. final StreamingFileSink<String> sink = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
  7. .build();
  8. input.addSink(sink);
  1. import org.apache.flink.api.common.serialization.SimpleStringEncoder
  2. import org.apache.flink.core.fs.Path
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
  4. val input: DataStream[String] = ...
  5. val sink: StreamingFileSink[String] = StreamingFileSink
  6. .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
  7. .build()
  8. input.addSink(sink)

This will create a streaming sink that creates hourly buckets and uses a default rolling policy. The default bucket assigner is DateTimeBucketAssigner and the default rolling policy is DefaultRollingPolicy. You can specify a custom BucketAssigner and RollingPolicy on the sink builder. Please check out the JavaDoc for StreamingFileSink for more configuration options and more documentation about the workings and interactions of bucket assigners and rolling policies. 这将创建一个流接收器,该接收器创建每小时的存储桶并使用默认的滚动策略。默认存储桶分配器是DateTimeBucketAssigner,默认滚动策略是DefaultRollingPolicy。您可以在接收器构建器上指定自定义的BucketAssigner和RollingPolicy。请查看JavaDoc for StreamingFileSink,以获取更多配置选项以及有关存储桶分配器和滚动策略的工作方式和交互作用的更多文档。

Using Bulk-encoded Output Formats 使用批量编码的输出格式

In the above example we used an Encoder that can encode or serialize each record individually. The streaming file sink also supports bulk-encoded output formats such as Apache Parquet. To use these, instead of StreamingFileSink.forRowFormat() you would use StreamingFileSink.forBulkFormat() and specify a BulkWriter.Factory. 在上面的示例中,我们使用Encoder可以单独编码或序列化每个记录的。流式文件接收器还支持批量编码的输出格式,例如Apache Parquet。要使用这些,StreamingFileSink.forRowFormat()您可以使用StreamingFileSink.forBulkFormat()并指定一个来代替BulkWriter.Factory。

ParquetAvroWriters has static methods for creating a BulkWriter.Factory for various types. ParquetAvroWriters具有用于BulkWriter.Factory为各种类型创建的静态方法。

IMPORTANT: Bulk-encoding formats can only be combined with the OnCheckpointRollingPolicy, which rolls the in-progress part file on every checkpoint. 重要信息:批量编码格式只能与组合使用OnCheckpointRollingPolicy,后者会在每个检查点滚动进行中的零件文件。