Flink兼容Apache Hadoop MapReduce的接口,因此可以使用面向MapReduce的代码。
你可以:
- Flink中使用Hadoop
Writable
数据类型(Data type). - 使用Hadoop
InputFormat
作为数据源(DataSource). - 使用Hadoop
OutputFormat
作为 数据落地(DataSink). - 使用Hadoop
Mapper
作为 FlatMapFunction. - 使用Hadoop
Reducer
作为 GroupReduceFunction.
这篇文档展示如何在Flink中使用现存的Hadoop MapReduce代码。可以参考 连接其他系统 来了解如何从Hadoop支持的文件系统中读取数据。
- This will be replaced by the TOC {:toc}
项目配置
支持Hadoop的输入输出(input/output)格式是flink-java
和flink-scala
的maven模块的一部分,这两部分是在编写Flink任务时经常需要用到的。 mapred
和mapreduce
的api代码分别在org.apache.flink.api.java.hadoop
和org.apache.flink.api.scala.hadoop
以及一个额外的子包中。
对Hadoop MapReduce的支持是在flink-hadoop-compatibility
的maven模块中。代码具体在org.apache.flink.hadoopcompatibility
包中。
如果想要重复使用Mappers and Reducers
, 需要在maven中的pom.xml中添加下面依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version}}</version>
</dependency>
使用Hadoop数据类型
Flink支持所有的Hadoop Writable
和 WritableComparable
数据类型, 不用额外添加Hadoop Compatibility 依赖。 可以参考Programming Guide了解如何使用Hadoop数据类型(Hadoop data type)。
使用Hadoop输入格式
可以使用Hadoop输入格式来创建数据源,具体是调用 ExecutionEnvironment 的 readHadoopFile 或 createHadoopInput方法。 前者用于来自FileInputFormat的输入格式, 后者用于普通的输入格式。
创建的数据集包含的是一个“键-值”2元组,“值”是从Hadoop输入格式获得的数值。
下面的例子介绍如何使用Hadoop的 TextInputFormat
。
使用Hadoop输出格式
Flink提供兼容Hadoop输出格式(Hadoop OutputFormat)的封装。支持任何实现org.apache.hadoop.mapred.OutputFormat
接口或者继承org.apache.hadoop.mapreduce.OutputFormat
的类。输出格式的封装需要的输入是“键值对”形式。他们将会交给Hadoop输出格式处理。
下面的例子介绍如何使用Hadoop的 TextOutputFormat
。
使用Hadoop Mappers和Reducers
Hadoop Mappers
语法上等价于Flink的FlatMapFunctions
,Hadoop Reducers
语法上等价于Flink的GroupReduceFunctions
。 Flink同样封装了Hadoop MapReduce
的Mapper and Reducer
接口的实现。 用户可以在Flink程序中复用Hadoop的Mappers and Reducers
。 这时,仅仅org.apache.hadoop.mapred
的Mapper and Reducer接口被支持。
The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>>
as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>>
as output where KEYIN
and KEYOUT
are the keys and VALUEIN
and VALUEOUT
are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction
) and without a Combiner (HadoopReduceFunction
). The wrappers accept an optional JobConf
object to configure the Hadoop Mapper or Reducer.
封装函数用DataSet<Tuple2<KEYIN,VALUEIN>>
作为输入, 产生DataSet<Tuple2<KEYOUT,VALUEOUT>>
作为输出, 其中KEYIN
和KEYOUT
是“键” ,VALUEIN
和VALUEOUT
是“值”,它们是Hadoop函数处理的键值对。 对于Reducers,Flink将GroupReduceFunction封装成HadoopReduceCombineFunction
,但没有Combiner(HadoopReduceFunction
)。 封装函数接收可选的JobConf
对象来配置Hadoop的Mapper or Reducer。
Flink的方法封装有
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
, andorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction
. 他们可以被用于FlatMapFunctions或GroupReduceFunctions.
下面的例子介绍如何使用Hadoop的Mapper
和Reducer
。
// 获取待处理数据
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// 使用Hadoop Mapper (Tokenizer)作为Map函数
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// 使用Hadoop Reducer (Counter)作为Reduce函数
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
需要注意: Reducer封装处理由Flink中的groupBy()定义的groups。 它并不考虑任何在JobConf定义的自定义的分区器(partitioners), 排序(sort)或分组(grouping)的比较器。
完整Hadoop WordCount示例
下面给出一个完整的使用Hadoop 数据类型, InputFormat/OutputFormat/Mapper/Reducer的示例。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建和初始化Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// 从Hadoop TextInputFormat读取数据.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// 使用Hadoop Mapper (Tokenizer)作为Map函数
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// 使用Hadoop Reducer (Counter)作为Reduce函数
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// 创建和初始化Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// 使用Hadoop TextOutputFormat输出结果.
result.output(hadoopOF);
// 执行程序
env.execute("Hadoop WordCount");