Flink兼容Apache Hadoop MapReduce的接口,因此可以使用面向MapReduce的代码。

你可以:

这篇文档展示如何在Flink中使用现存的Hadoop MapReduce代码。可以参考 连接其他系统 来了解如何从Hadoop支持的文件系统中读取数据。

  • This will be replaced by the TOC {:toc}

项目配置

支持Hadoop的输入输出(input/output)格式是flink-javaflink-scala的maven模块的一部分,这两部分是在编写Flink任务时经常需要用到的。 mapredmapreduce 的api代码分别在org.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop以及一个额外的子包中。

对Hadoop MapReduce的支持是在flink-hadoop-compatibility的maven模块中。代码具体在org.apache.flink.hadoopcompatibility包中。

如果想要重复使用Mappers and Reducers, 需要在maven中的pom.xml中添加下面依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
  4. <version>{{site.version}}</version>
  5. </dependency>

使用Hadoop数据类型

Flink支持所有的Hadoop WritableWritableComparable 数据类型, 不用额外添加Hadoop Compatibility 依赖。 可以参考Programming Guide了解如何使用Hadoop数据类型(Hadoop data type)。

使用Hadoop输入格式

可以使用Hadoop输入格式来创建数据源,具体是调用 ExecutionEnvironment 的 readHadoopFile 或 createHadoopInput方法。 前者用于来自FileInputFormat的输入格式, 后者用于普通的输入格式。

创建的数据集包含的是一个“键-值”2元组,“值”是从Hadoop输入格式获得的数值。

下面的例子介绍如何使用Hadoop的 TextInputFormat

~java ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> input = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); // Do something with the data. […] ~
~scala val env = ExecutionEnvironment.getExecutionEnvironment val input: DataSet[(LongWritable, Text)] = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) // Do something with the data. […] ~

使用Hadoop输出格式

Flink提供兼容Hadoop输出格式(Hadoop OutputFormat)的封装。支持任何实现org.apache.hadoop.mapred.OutputFormat接口或者继承org.apache.hadoop.mapreduce.OutputFormat的类。输出格式的封装需要的输入是“键值对”形式。他们将会交给Hadoop输出格式处理。

下面的例子介绍如何使用Hadoop的 TextOutputFormat

~java // 获取所需数据 DataSet> hadoopResult = […] // 创建和初始化Hadoop TextOutputFormat. HadoopOutputFormat hadoopOF = new HadoopOutputFormat( // 设置Hadoop OutputFormat和特定的job作为初始化参数 new TextOutputFormat(), job ); hadoopOF.getConfiguration().set(“mapreduce.output.textoutputformat.separator”, “ “); TextOutputFormat.setOutputPath(job, new Path(outputPath)); // 通过Hadoop TextOutputFormat输出结果 hadoopResult.output(hadoopOF); ~
~scala // 获取所需数据 val hadoopResult: DataSet[(Text, IntWritable)] = […] val hadoopOF = new HadoopOutputFormatText,IntWritable hadoopOF.getJobConf.set(“mapred.textoutputformat.separator”, “ “) FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) hadoopResult.output(hadoopOF) ~

使用Hadoop Mappers和Reducers

Hadoop Mappers 语法上等价于Flink的FlatMapFunctionsHadoop Reducers语法上等价于Flink的GroupReduceFunctions。 Flink同样封装了Hadoop MapReduceMapper and Reducer接口的实现。 用户可以在Flink程序中复用Hadoop的Mappers and Reducers。 这时,仅仅org.apache.hadoop.mapred的Mapper and Reducer接口被支持。

The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.

封装函数用DataSet<Tuple2<KEYIN,VALUEIN>>作为输入, 产生DataSet<Tuple2<KEYOUT,VALUEOUT>>作为输出, 其中KEYINKEYOUT是“键” ,VALUEINVALUEOUT 是“值”,它们是Hadoop函数处理的键值对。 对于Reducers,Flink将GroupReduceFunction封装成HadoopReduceCombineFunction,但没有Combiner(HadoopReduceFunction)。 封装函数接收可选的JobConf对象来配置Hadoop的Mapper or Reducer。

Flink的方法封装有

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction. 他们可以被用于FlatMapFunctionsGroupReduceFunctions.

下面的例子介绍如何使用Hadoop的MapperReducer

  1. // 获取待处理数据
  2. DataSet<Tuple2<Text, LongWritable>> text = [...]
  3. DataSet<Tuple2<Text, LongWritable>> result = text
  4. // 使用Hadoop Mapper (Tokenizer)作为Map函数
  5. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  6. new Tokenizer()
  7. ))
  8. .groupBy(0)
  9. // 使用Hadoop Reducer (Counter)作为Reduce函数
  10. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  11. new Counter(), new Counter()
  12. ));

需要注意: Reducer封装处理由Flink中的groupBy()定义的groups。 它并不考虑任何在JobConf定义的自定义的分区器(partitioners), 排序(sort)或分组(grouping)的比较器。

完整Hadoop WordCount示例

下面给出一个完整的使用Hadoop 数据类型, InputFormat/OutputFormat/Mapper/Reducer的示例。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // 创建和初始化Hadoop TextInputFormat.
  3. Job job = Job.getInstance();
  4. HadoopInputFormat<LongWritable, Text> hadoopIF =
  5. new HadoopInputFormat<LongWritable, Text>(
  6. new TextInputFormat(), LongWritable.class, Text.class, job
  7. );
  8. TextInputFormat.addInputPath(job, new Path(inputPath));
  9. // 从Hadoop TextInputFormat读取数据.
  10. DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
  11. DataSet<Tuple2<Text, LongWritable>> result = text
  12. // 使用Hadoop Mapper (Tokenizer)作为Map函数
  13. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
  14. new Tokenizer()
  15. ))
  16. .groupBy(0)
  17. // 使用Hadoop Reducer (Counter)作为Reduce函数
  18. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
  19. new Counter(), new Counter()
  20. ));
  21. // 创建和初始化Hadoop TextOutputFormat.
  22. HadoopOutputFormat<Text, IntWritable> hadoopOF =
  23. new HadoopOutputFormat<Text, IntWritable>(
  24. new TextOutputFormat<Text, IntWritable>(), job
  25. );
  26. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
  27. TextOutputFormat.setOutputPath(job, new Path(outputPath));
  28. // 使用Hadoop TextOutputFormat输出结果.
  29. result.output(hadoopOF);
  30. // 执行程序
  31. env.execute("Hadoop WordCount");