本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。

在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。

首先,在本地创建 3 个文件:file00l、file002 和 file003,文件具体内容如表 1 所示。

再使用 HDFS 命令创建一个 input 文件目录。然后,把 file001、file002 和 file003 上传到 HDFS 中的 input 目录下。编写 MapReduce 程序的第一个任务就是编写 Map 程序。在单词计数任务中,Map 需要完成的任务就是把输入的文本数据按单词进行拆分,然后以特定的键值对的形式进行输出。Hadoop MapReduce 框架已经在类 Mapper 中实现了 Map 任务的基本功能。为了实现 Map 任务,开发者只需要继承类 Mapper,并实现该类的 Map 函数。

为实现单词计数的 Map 任务,首先为类 Mapper 设定好输入类型和输出类型。这里,Map 函数的输入是 形式,其中,key 是输入文件中一行的行号,value 是该行号对应的一行内容。

所以,Map 函数的输入类型为 。Map 函数的功能为完成文本分割工作,Map 函数的输出也是 形式,其中,key 是单词,value 为该单词出现的次数。所以,Map 函数的输出类型为

以下是单词计数程序的 Map 任务的实现代码。

任务准备

文件名 file001 file002 file003
文件内容 Hello world
Connected world
One world
One dream
Hello Hadoop
Hello Map
Hello Reduce

hadoop fs -mkdir input
hadoop fs -put file001 input
hadoop fs -put file002 input
hadoop fs -put file003 input

编写 Map 程序

public static class CoreMapper extends Mapper { private static final IntWritable one = new IntWritable(1); private static Text label = new Text(); public void map(Object key,Text value,Mapper Context context)throws IOException,InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while(tokenizer.hasMoreTokens()) { label.set(tokenizer.nextToken()); context.write(label,one); } }}
在上述代码中,实现 Map 任务的类为 CoreMapper。该类首先将需要输出的两个变量 one 和 label 进行初始化。
StringTokenizer 类机器方法将 value 变量中文本的一行文字进行拆分,拆分后的单词放在 tokenizer 列表中。然后程序通过循环对每一个单词进行处理,把单词放在 label 中,把 one 作为单词计数。

在函数的整个执行过程中,one 的值一直是 1。在该实例中,key 没有被明显地使用到。context 是 Map 函数的一种输出方式,通过使用该变量,可以直接将中间结果存储在其中。

根据上述代码,Map 任务结束后,3 个文件的输出结果如表 2 所示。

编写 MapReduce 程序的第二个任务就是编写 Reduce 程序。在单词计数任务中,Reduce 需要完成的任务就是把输入结果中的数字序列进行求和,从而得到每个单词的出现次数。

在执行完 Map 函数之后,会进入 Shuffle 阶段,在这个阶段中,MapReduce 框架会自动将 Map 阶段的输出结果进行排序和分区,然后再分发给相应的 Reduce 任务去处理。经过 Map 端 Shuffle 阶段后的结果如表 3 所示。

Reduce 端接收到各个 Map 端发来的数据后,会进行合并,即把同一个 key,也就是同一单词的键值对进行合并,形成> 形式的输出。经过 Map 端 Shuffle 阶段后的结果如表 4 所示。

Reduce 阶段需要对上述数据进行处理从而得到每个单词的出现次数。从 Reduce 函数的输入已经可以理解 Reduce 函数需要完成的工作,就是首先对输入数据 value 中的数字序列进行求 和。以下是单词计数程序的 Reduce 任务的实现代码。与 Map 任务实现相似,Reduce 任务也是继承 Hadoop 提供的类 Reducer 并实现其接口。 Reduce 函数的输入、输出类型与 Map 函数的输出类型本质上是相同的。

在 Reduce 函数的开始部分,首先设置 sum 参数用来记录每个单词的出现次数,然后遍历 value 列表,并对其中的数字进行累加,最终就可以得到每个单词总的出现次数。在输出的时候,仍然使用 context 类型的变量存储信息。当 Reduce 阶段结束时,就可以得到最终需要的结果,如表 5 所示。
为了使用 CoreMapper 和 CoreReducer 类进行真正的数据处理,还需要在 main 函数中通过 Job 类设置 Hadoop MapReduce 程序运行时的环境变量,以下是具体代码。代码首先检查参数是不是正确,如果不正确就提醒用户。随后,通过 Job 类设置环境参数,并设置程序的类、Mapper 类和 Reducer 类。然后,设置了程序的输出类型,也就是 Reduce 函数的输出结果 中 key 和 value 各自的类型。最后,根据程序运行时的参数,设置输入、输出文件路径。编写 MapReduce 程序需要引用 Hadoop 的以下几个核心组件包,它们实现了 Hadoop MapReduce 框架。这些核心组件包的基本功能描述如表 6 所示。

在运行代码前,需要先把当前工作目录设置为 /user/local/Hadoop。编译 WordCount 程序需要以下 3 个 Jar,为了简便起见,把这 3 个 Jar 添加到 CLASSPATH 中。
使用 JDK 包中的工具对代码进行编译。
编译之后,在文件目录下可以发现有 3 个“.class”文件,这是 Java 的可执行文件,将它们打包并命名为 wordcount.jar。这样就得到了单词计数程序的 Jar 包。在运行程序之前,需要启动 Hadoop 系统,包括启动 HDFS 和 MapReduce。然后,就可以运行程序了。
最后,可以运行下面的命令查看结果。

  • 变量 one 的初始值直接设置为 1,表示某个单词在文本中出现过。
  • Map 函数的前两个参数是函数的输入参数,value 为 Text 类型,是指每次读入文本的一行,key 为 Object 类型,是指输入的行数据在文本中的行号。 | 文件名/Map | file001/Map1 | file002/Map2 | file003/Map3 | | —- | —- | —- | —- | | Map 任务输出结果 | <”Hello”,1>
    <”world”,1>
    <”Connected”,1>
    <”world”,1> | <”One”,1>
    <”world”,1>
    <”One”,1>
    <”dream”,1> | <”Hello”,1>
    <”Hadoop”,1>
    <”Hello”,1>
    <”Map”,1>
    <”Hello”, 1>
    <”Reduce”,1> |

编写 Reduce 程序

文件名/Map file001/Map1 file002/Map2 file003/Map3
Map 端
Shuffle 阶段输出结果
<”Connected”,1>
<”Hello”, 1>
<”world”,<1,1>>
<”dream”,1>
<”One”, <1, 1>>
<”world”, 1>

<”Map”, 1>
<”Hadoop”,1>
<”Hello”,<1,1,1>>
<”Reduce”, 1>
Reduce 端 <”Connected”,1>

Shuffle 阶段输出结果
< “dream”,1>
<”Hadoop”,1>
<”Hello”,<1,1,1,1>>
<”Map”,1>
<”One”,<1,1>>
<”world”, <1,1,1>>
<”Reduce”, 1>

public static class CoreReducer extends Reducer {
private IntWritable count = new IntWritable ();
public void reduce(Text key,Iterable values,Reducer Context context)throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : values){
sum += intWritable.get();
}
count.set(sum);
context.write(key, count);
}
}

Reduce 任务输出结果 <”Connected”, 1>
<”dream”, 1>
<”Hadoop”, 1>
<”Hello”, 4>
<”Map”, 1>
<”One”, 2>
<”world”, 3>
<”Reduce”, 1>


编写 main 函数

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.printIn(“Usage:wordcount “);
System.exit(2);
}
Job job = new Job (conf, “WordCount”); //设置环境参数
job.setJarByClass (WordCount.class); //设置程序的类名
job.setMapperClass(CoreMapper.class); //添加 Mapper 类
job.setReducerClass(CoreReducer.class); //添加 Reducer类
job.setOutputKeyClass (Text.class); //设置输出 key 的类型
job.setOutputValueClass (IntWritable.class);
//设置输出 value 的类型
FileInputFormat.addInputPath (job, new Path (otherArgs [0]));
//设置输入文件路径
FileOutputFormat.setOutputPath (job,new Path (otherArgs [1]));
//设置输入文件路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

核心代码包

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

功能
org.apache.hadoop.conf 定义了系统参数的配置文件处理方法
org.apache.hadoop.fs 定义了抽象的文件系统 API
org.apache.hadoop.mapreduce Hadoop MapReduce 框架的实现,包括任务的分发调度等
org.apache.hadoop.io 定义了通用的 I/O API,用于网络、数据库和文件数据对象 进行读写操作

运行代码

$export
CLASSPATH=/usr/local/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar:$CLASSPATH
$export
CLASSPATH=/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-2.7.3.jar:$CLAS SPATH
$export
CLASSPATH=/usr/local/hadoop/share/hadoop/common/lib/common-cli-1.2.jar:$CLASSPATH
$ javac WordCount.java
$ jar -cvf wordcount.jar .class
$ ./bin/Hadoop jar wordcount.jar WordCount input output
$ ./bin/Hadoop fs -cat output/