Hadoop兼容性测试版

Flink与ApacheHadoopMapReduce接口兼容,因此允许重用为HadoopMapReduce实现的代码。

你可以:

  • 在Flink程序中使用Hadoop的“可写”[数据类型](index.html数据类型)。
  • 将任何hadoopinputformat用作[数据源](index.html数据源)。
  • 使用任意hadoopoutputformat作为[数据接收器](index.html数据接收器)。
  • 使用hadoopmapper作为[flatmapfunction](dataset_transformations.html flatmap)。
  • 使用hadoopreducer作为[groupreducefunction](dataset_transforms.html groupreduce on grouped dataset)。

本文档展示了如何在Flink中使用现有的Hadoop MapReduce代码。有关从Hadoop支持的文件系统读取的信息,请参阅[连接到其他系统](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/connectors.html)指南。

项目配置

对Hadoop输入/输出格式的支持是“FLink Java”和“FLink Scala”Maven模块的一部分,在编写FLink作业时总是需要这些模块。代码位于’MapRead’和“MapReduce”API的附加子包中,位于’org.apcli.FLink .api.java.Hadoop ‘和’org.apache .FLink .api.Scala.Hadoop ‘中。

对Hadoop Mapper和Reducer的支持包含在“Flink Hadoop Compatibility”Maven模块中。此代码位于’org.apache.flink.hadoopcompatibility’包中。

如果要重写Mapper和Reducer,请将以下依赖项添加到“pom.xml”。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

使用Hadoop数据类型

Flink supports all Hadoop Writable and WritableComparable data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the Programming Guide for more details.

Using Hadoop InputFormats

要将hadoop“InputFormats”与flink一起使用,必须首先使用“HadoopInputs”实用程序类的“readhadoopfile”或“createhadoopinput”包装格式。前者用于从“fileinputformat”派生的输入格式,而后者必须用于通用输入格式。生成的“InputFormat”可用于使用“executionenvironmen createinput”创建数据源。

结果集“dataset”包含两个元组,其中第一个字段是键,第二个字段是从hadoop inputformat检索到的值。

下面的示例演示如何使用Hadoop的“TextInputformat”。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<Tuple2<LongWritable, Text>> input =
  3. env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
  4. LongWritable.class, Text.class, textPath));
  5. // Do something with the data.
  6. [...]
  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. val input: DataSet[(LongWritable, Text)] =
  3. env.createInput(HadoopInputs.readHadoopFile(
  4. new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
  5. // Do something with the data. [...]

使用Hadoop输出格式

Flink为Hadoop的“outputformats”提供了一个兼容性包装器。支持实现“org.apache.hadoop.mapred.outputformat”或扩展“org.apache.hadoop.mapreduce.outputformat”的任何类。OutputFormat包装器期望其输入数据是一个包含键和值的2个元组的数据集。这些将由Hadoop输出格式处理。

下面的示例演示如何使用Hadoop的“textOutputFormat”。

  1. // Obtain the result we want to emit
  2. DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
  3. // Set up the Hadoop TextOutputFormat.
  4. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  5. // create the Flink wrapper.
  6. new HadoopOutputFormat<Text, IntWritable>(
  7. // set the Hadoop OutputFormat and specify the job.
  8. new TextOutputFormat<Text, IntWritable>(), job
  9. );
  10. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  11. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  12. // Emit data using the Hadoop TextOutputFormat.
  13. hadoopResult.output(hadoopOF);
  1. // Obtain your result to emit. val hadoopResult: DataSet[(Text, IntWritable)] = [...]
  2. val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  3. new TextOutputFormat[Text, IntWritable],
  4. new JobConf)
  5. hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
  6. FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
  7. hadoopResult.output(hadoopOF)

使用Hadoop Mapper和Reducers

Hadoop映射器在语义上等价于Flink的FlatmapFunctions,Hadoop Reducers等价于Flink的GroupReduceFunctions。Flink为Hadoop MapReduce的映射器和Reducer接口的实现提供了包装器,也就是说,您可以在常规的Flink程序中重用Hadoop映射器和Reducer。目前,只支持hadoop的mapred api(org.apache.hadoop.mapred)的mapper和reduce接口。

包装器以“dataset&lt;tuple2&lt;keyin,valuein&gt&gt;”作为输入并生成“dataset&lt;tuple2&lt;keyout,valueout&gt&gt;”作为输出,其中“keyin”和“keyout”是键,“valuein”和“valueout”是Hadoop函数处理的Hadoop键值对的值。对于Reducers,Flink提供了一个带有(hadooPreduceCombineFunction)和不带Combiner(hadooPreduceFunction)的GroupReduceFunction包装器。包装程序接受可选的“jobconf”对象来配置Hadoop映射器或Reducer。

Flink的函数包装器是

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

并且可以用作常规Flink[FlatmapFunctions](dataset_transformations.html_flatmap)或[GroupReduceFunctions](dataset_transformations.html_groupReduce on grouped dataset)。 下面的示例演示如何使用hadoop的“mapper”和“reducer”函数。

  1. // Obtain data to process somehow.
  2. DataSet<Tuple2<Text, LongWritable>> text = [...]
  3. DataSet<Tuple2<Text, LongWritable>> result = text
  4. // use Hadoop Mapper (Tokenizer) as MapFunction
  5. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  6. new Tokenizer()
  7. ))
  8. .groupBy(0)
  9. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  10. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  11. new Counter(), new Counter()
  12. ));

请注意: Reducer包装器在Flink的[GroupBy()]操作(dataset_transforms.html transforms on grouped dataset)定义的组上工作。它不考虑您可能在“jobconf”中设置的任何自定义分区、排序或分组比较器。

完整的Hadoop wordcount example

下面的示例显示了使用Hadoop数据类型、输入和输出格式以及映射器和Reducer实现的完整的wordcount实现。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // Set up the Hadoop TextInputFormat.
  3. Job job = Job.getInstance();
  4. HadoopInputFormat<LongWritable, Text> hadoopIF =
  5. new HadoopInputFormat<LongWritable, Text>(
  6. new TextInputFormat(), LongWritable.class, Text.class, job
  7. );
  8. TextInputFormat.addInputPath(job, new Path(inputPath));
  9. // Read data using the Hadoop TextInputFormat.
  10. DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
  11. DataSet<Tuple2<Text, LongWritable>> result = text
  12. // use Hadoop Mapper (Tokenizer) as MapFunction
  13. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  14. new Tokenizer()
  15. ))
  16. .groupBy(0)
  17. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  18. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  19. new Counter(), new Counter()
  20. ));
  21. // Set up the Hadoop TextOutputFormat.
  22. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  23. new HadoopOutputFormat<Text, IntWritable>(
  24. new TextOutputFormat<Text, IntWritable>(), job
  25. );
  26. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  27. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  28. // Emit data using the Hadoop TextOutputFormat.
  29. result.output(hadoopOF);
  30. // Execute Program
  31. env.execute("Hadoop WordCount");