译者:flink.sojb.cn

Flink与Apache Hadoop MapReduce接口兼容,因此允许重用为Hadoop MapReduce实现的代码。

您可以:

本文档展示了如何在Flink中使用现有的Hadoop MapReduce代码。有关从Hadoop支持的文件系统中读取的信息,请参阅“ 连接到其他系统”指南。

项目配置

Hadoop的输入/输出格式的支持是的一部分flink-javaflink-scala写入Flink作业时总是需要的Maven模块。的代码位于org.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop在附加的子包的 mapredmapreduceAPI。

flink-hadoop-compatibility Maven模块中包含对Hadoop Mappers和Reducers的支持。此代码驻留在org.apache.flink.hadoopcompatibility 包中。

pom.xml如果要重用Mappers和Reducers,请将以下依赖项添加到您的。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

使用Hadoop数据类型

Flink支持开箱即用的所有Hadoop WritableWritableComparable数据类型。如果您只想使用Hadoop数据类型,则不需要包含Hadoop兼容性依赖项。有关详细信息,请参阅 编程指南

使用Hadoop InputFormats

要使用的Hadoop InputFormats与Flink格式必须首先使用任一包裹readHadoopFilecreateHadoopInput在的 HadoopInputsutilty类。前者用于从FileInputFormat后者输出的输入格式,而后者必须用于通用输入格式。结果InputFormat可用于通过使用创建数据源 ExecutionEnvironmen#createInput

结果DataSet包含2元组,其中第一个字段是键,第二个字段是从Hadoop InputFormat检索的值。

以下示例显示了如何使用Hadoop TextInputFormat

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<Tuple2<LongWritable, Text>> input =
  3. env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
  4. LongWritable.class, Text.class, textPath));
  5. // Do something with the data.
  6. [...]
  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. val input: DataSet[(LongWritable, Text)] =
  3. env.createInput(HadoopInputs.readHadoopFile(
  4. new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
  5. // Do something with the data. [...]

使用Hadoop OutputFormats

Flink为Hadoop提供了兼容性打包器OutputFormats。支持实现org.apache.hadoop.mapred.OutputFormat或扩展的 任何类org.apache.hadoop.mapreduce.OutputFormat。OutputFormat打包器期望其输入数据是包含2元组键和值的DataSet。这些将由Hadoop OutputFormat处理。

以下示例显示了如何使用Hadoop TextOutputFormat

  1. // Obtain the result we want to emit
  2. DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
  3. // Set up the Hadoop TextOutputFormat.
  4. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  5. // create the Flink wrapper.
  6. new HadoopOutputFormat<Text, IntWritable>(
  7. // set the Hadoop OutputFormat and specify the job.
  8. new TextOutputFormat<Text, IntWritable>(), job
  9. );
  10. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  11. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  12. // Emit data using the Hadoop TextOutputFormat.
  13. hadoopResult.output(hadoopOF);
  1. // Obtain your result to emit. val hadoopResult: DataSet[(Text, IntWritable)] = [...]
  2. val hadoopOF = new HadoopOutputFormat[Text,IntWritable](eeb7d5db329fd595a908815ec29491f2)
  3. hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
  4. FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
  5. hadoopResult.output(hadoopOF)

使用Hadoop Mappers和Reducers

Hadoop Mappers在语义上等同于Flink的FlatMapFunctions,而Hadoop Reducers等同于Flink的GroupReduceFunctions。Flink为Hadoop MapReduce MapperReducer接口的实现提供打包器,即,您可以在常规Flink程序中重用您的Hadoop Mappers和Reducers。目前,只org.apache.hadoop.mapred支持Hadoop的mapred API()的Mapper和Reduce接口。

打包器将DataSet&lt;Tuple2&lt;KEYIN,VALUEIN&gt;&gt;输入作为输入并生成DataSet&lt;Tuple2&lt;KEYOUT,VALUEOUT&gt;&gt;输出,其中KEYINKEYOUT是键,VALUEIN并且VALUEOUT是Hadoop函数处理的Hadoop键值对的值。对于Reducers,Flink为GroupReduceFunction提供了一个打包器(HadoopReduceCombineFunction),没有Combiner(HadoopReduceFunction)。打包器接受一个可选JobConf对象来配置Hadoop Mapper或Reducer。

Flink的函数打包器是

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction,和
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction

并可用作常规Flink FlatMapFunctionsGroupReduceFunctions

以下示例显示了如何使用Hadoop MapperReducer函数。

  1. // Obtain data to process somehow.
  2. DataSet<Tuple2<Text, LongWritable>> text = [...]
  3. DataSet<Tuple2<Text, LongWritable>> result = text
  4. // use Hadoop Mapper (Tokenizer) as MapFunction
  5. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  6. new Tokenizer()
  7. ))
  8. .groupBy(0)
  9. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  10. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  11. new Counter(), new Counter()
  12. ));

请注意: Reducer打包器适用于Flink的groupBy() 算子操作定义的组。它不考虑您可能在中设置的任何自定义分区器,排序或分组比较器JobConf

完成Hadoop WordCount示例

以下示例显示了使用Hadoop数据类型,Input-和OutputFormats以及Mapper和Reducer实现的完整WordCount实现。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // Set up the Hadoop TextInputFormat.
  3. Job job = Job.getInstance();
  4. HadoopInputFormat<LongWritable, Text> hadoopIF =
  5. new HadoopInputFormat<LongWritable, Text>(
  6. new TextInputFormat(), LongWritable.class, Text.class, job
  7. );
  8. TextInputFormat.addInputPath(job, new Path(inputPath));
  9. // Read data using the Hadoop TextInputFormat.
  10. DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
  11. DataSet<Tuple2<Text, LongWritable>> result = text
  12. // use Hadoop Mapper (Tokenizer) as MapFunction
  13. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  14. new Tokenizer()
  15. ))
  16. .groupBy(0)
  17. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  18. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  19. new Counter(), new Counter()
  20. ));
  21. // Set up the Hadoop TextOutputFormat.
  22. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  23. new HadoopOutputFormat<Text, IntWritable>(
  24. new TextOutputFormat<Text, IntWritable>(), job
  25. );
  26. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  27. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  28. // Emit data using the Hadoop TextOutputFormat.
  29. result.output(hadoopOF);
  30. // Execute Program
  31. env.execute("Hadoop WordCount");