此连接器提供一个Sink,可将分区文件写入Hadoop FileSystem支持的任何文件系统 。要使用此连接器,请将以下依赖项添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
请注意,流连接器当前不是二进制分发的一部分。 有关如何使用库将程序打包以执行集群的信息,请参见 此处。
Bucketing File Sink
可以配置分段行为以及写入,但我们稍后会介绍。这是您可以创建一个默认情况下汇总到按时间拆分的滚动文件的存储槽的方法:
DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));
val input: DataStream[String] = ...
input.addSink(new BucketingSink[String](docs_1.7-SNAPSHOT_"_base_path"))
唯一必需的参数是存储桶的基本路径。可以通过指定自定义布料,写入器和批量大小来进一步配置接收器。
默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式"yyyy-MM-dd--HH"
命名存储区。这种模式传递给 DateTimeFormatter
使用当前系统时间和JVM的默认时区来形成存储桶路径。用户还可以为bucketer指定时区以格式化存储桶路径。每当遇到新日期时,都会创建一个新存储桶。例如,如果您有一个包含分钟作为最精细粒度的模式,您将每分钟获得一个新桶。每个存储桶本身都是一个包含多个部分文件的目录:接收器的每个并行实例将创建自己的部件文件,当部件文件变得太大时,接收器也会在其他文件旁边创建新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。如果存储桶最近未写入,则视为非活动状态。默认情况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()
并 setInactiveBucketThreshold()
在一个BucketingSink
。
您也可以通过指定自定义bucketer setBucketer()
上BucketingSink
。如果需要,bucketer可以使用数据元或元组的属性来确定bucket目录。
默认编写器是StringWriter
。这将调用toString()
传入的数据元并将它们写入部分文件,由换行符分隔。在a setWriter()
上指定自定义编写器使用BucketingSink
。如果要编写Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter
,也可以配置为使用压缩。
有两个配置选项指定何时应关闭零件文件并启动新零件文件:
- 通过设置批量大小(默认部件文件大小为384 MB)
- 通过设置批次滚动时间间隔(默认滚动间隔为
Long.MAX_VALUE
)
当满足这两个条件中的任何一个时,将启动新的零件文件。
例:
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ...
val sink = new BucketingSink[String](docs_1.7-SNAPSHOT_"_base_path")
sink.setBucketer(new DateTimeBucketer[String](68071666baedfabdb78b0eb137cd4c60)))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink)
这将创建一个接收器,该接收器将写入遵循此模式的存储桶文件:
/base/path/{date-time}/part-{parallel-task}-{count}
date-time
我们从日期/时间格式获取的字符串在哪里,parallel-task
是并行接收器实例的索引,并且count
是由于批处理大小或批处理翻转间隔而创建的部分文件的运行数。
有关详细信息,请参阅JavaDoc for BucketingSink。