Flink与Apache Hadoop MapReduce接口兼容,因此允许重用为Hadoop MapReduce实现的代码。
您可以:
- 在Flink程序中使用Hadoop的
Writable
数据类型。 - 使用任何Hadoop
InputFormat
作为数据源。 - 使用任何Hadoop
OutputFormat
作为DataSink。 - 使用Hadoop
Mapper
作为FlatMapFunction。 - 使用Hadoop
Reducer
作为GroupReduceFunction。
本文档展示了如何在Flink中使用现有的Hadoop MapReduce代码。有关从Hadoop支持的文件系统中读取的信息,请参阅“ 连接到其他系统”指南。
项目配置
Hadoop的输入/输出格式的支持是的一部分flink-java
和 flink-scala
写入Flink作业时总是需要的Maven模块。的代码位于org.apache.flink.api.java.hadoop
和 org.apache.flink.api.scala.hadoop
在附加的子包的 mapred
和mapreduce
API。
flink-hadoop-compatibility
Maven模块中包含对Hadoop Mappers和Reducers的支持。此代码驻留在org.apache.flink.hadoopcompatibility
包中。
pom.xml
如果要重用Mappers和Reducers,请将以下依赖项添加到您的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
使用Hadoop数据类型
Flink支持开箱即用的所有Hadoop Writable
和WritableComparable
数据类型。如果您只想使用Hadoop数据类型,则不需要包含Hadoop兼容性依赖项。有关详细信息,请参阅 编程指南。
使用Hadoop InputFormats
要使用的Hadoop InputFormats
与Flink格式必须首先使用任一包裹readHadoopFile
或createHadoopInput
在的 HadoopInputs
utilty类。前者用于从FileInputFormat
后者输出的输入格式,而后者必须用于通用输入格式。结果InputFormat
可用于通过使用创建数据源 ExecutionEnvironmen#createInput
。
结果DataSet
包含2元组,其中第一个字段是键,第二个字段是从Hadoop InputFormat检索的值。
以下示例显示了如何使用Hadoop TextInputFormat
。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(LongWritable, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
// Do something with the data. [...]
使用Hadoop OutputFormats
Flink为Hadoop提供了兼容性打包器OutputFormats
。支持实现org.apache.hadoop.mapred.OutputFormat
或扩展的 任何类org.apache.hadoop.mapreduce.OutputFormat
。OutputFormat打包器期望其输入数据是包含2元组键和值的DataSet。这些将由Hadoop OutputFormat处理。
以下示例显示了如何使用Hadoop TextOutputFormat
。
// Obtain the result we want to emit
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF);
// Obtain your result to emit. val hadoopResult: DataSet[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](eeb7d5db329fd595a908815ec29491f2)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)
使用Hadoop Mappers和Reducers
Hadoop Mappers在语义上等同于Flink的FlatMapFunctions,而Hadoop Reducers等同于Flink的GroupReduceFunctions。Flink为Hadoop MapReduce Mapper
和Reducer
接口的实现提供打包器,即,您可以在常规Flink程序中重用您的Hadoop Mappers和Reducers。目前,只org.apache.hadoop.mapred
支持Hadoop的mapred API()的Mapper和Reduce接口。
打包器将DataSet<Tuple2<KEYIN,VALUEIN>>
输入作为输入并生成DataSet<Tuple2<KEYOUT,VALUEOUT>>
输出,其中KEYIN
和KEYOUT
是键,VALUEIN
并且VALUEOUT
是Hadoop函数处理的Hadoop键值对的值。对于Reducers,Flink为GroupReduceFunction提供了一个打包器(HadoopReduceCombineFunction
),没有Combiner(HadoopReduceFunction
)。打包器接受一个可选JobConf
对象来配置Hadoop Mapper或Reducer。
Flink的函数打包器是
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
,和org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction
。
并可用作常规Flink FlatMapFunctions或GroupReduceFunctions。
以下示例显示了如何使用Hadoop Mapper
和Reducer
函数。
// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
请注意: Reducer打包器适用于Flink的groupBy() 算子操作定义的组。它不考虑您可能在中设置的任何自定义分区器,排序或分组比较器JobConf
。
完成Hadoop WordCount示例
以下示例显示了使用Hadoop数据类型,Input-和OutputFormats以及Mapper和Reducer实现的完整WordCount实现。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);
// Execute Program
env.execute("Hadoop WordCount");