在Hadoop平台上部署WordCount程序

在 Eclipse 中创建 “WordCount” MapReduce项目

点击 File 菜单,选择 New -> Project…:

实验二详细 - 图1

选择 Map/Reduce Project,点击 Next:

实验二详细 - 图2

点击“Configure Hadoop install directory…”

实验二详细 - 图3

点击“Browse”,选择/home/hfut/hadoop-3.2.2

实验二详细 - 图4

点击界面右下方“OK”按钮

实验二详细 - 图5

点击Finish创建项目。

右键点击MyWordCount 项目,选择New -> Class:

实验二详细 - 图6

在 Name 处填写 WordCountTest

实验二详细 - 图7

将如下 WordCountTest 的代码复制到该WordCountTest.java中。

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class WordCountTest {
  15. public WordCountTest() {
  16. }
  17. public static void main(String[] args) throws Exception {
  18. Configuration conf = new Configuration();
  19. String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  20. if(otherArgs.length < 2) {
  21. System.err.println("Usage: wordcount <in> [<in>...] <out>");
  22. System.exit(2);
  23. }
  24. Job job = Job.getInstance(conf, "word count test");
  25. job.setJarByClass(WordCountTest.class);
  26. job.setMapperClass(WordCountTest.TokenizerMapper.class);
  27. job.setCombinerClass(WordCountTest.IntSumReducer.class);
  28. job.setReducerClass(WordCountTest.IntSumReducer.class);
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(IntWritable.class);
  31. for(int i = 0; i < otherArgs.length - 1; ++i) {
  32. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  33. }
  34. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
  35. System.exit(job.waitForCompletion(true)?0:1);
  36. }
  37. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  38. private IntWritable result = new IntWritable();
  39. public IntSumReducer() {
  40. }
  41. public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  42. int sum = 0;
  43. IntWritable val;
  44. for(Iterator itr = values.iterator(); itr.hasNext(); sum += val.get()) {
  45. val = (IntWritable)itr.next();
  46. }
  47. this.result.set(sum);
  48. context.write(key, this.result);
  49. }
  50. }
  51. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  52. private static final IntWritable one = new IntWritable(1);
  53. private Text word = new Text();
  54. public TokenizerMapper() {
  55. }
  56. public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  57. StringTokenizer itr = new StringTokenizer(value.toString());
  58. while(itr.hasMoreTokens()) {
  59. this.word.set(itr.nextToken());
  60. context.write(this.word, one);
  61. }
  62. }
  63. }
  64. }

将Hadoop配置文件添加到“WordCount” MapReduce项目

将log4j.properties 复制到 WordCount 项目下的 src 文件夹(~/workspace/WordCount/src)中:

  1. [hfut@master ~]$ cp ~/hadoop-3.2.2/etc/hadoop/log4j.properties ~/workspace/MyWordCount/src

复制完成后,务必右键点击 WordCount 选择 refresh 进行刷新(不会自动刷新,需要手动刷新),可以看到文件结构如下所示:

实验二详细 - 图8

部署到Hadoop平台上运行

请在Eclipse工作界面左侧的“Package Explorer”面板中,在工程名称“MyWordCount”上点击鼠标右键,在弹出的菜单中选择“Export”,如下图所示。

实验二详细 - 图9

然后,会看到弹出以下界面:

实验二详细 - 图10

在该界面中,选择“Runnable JAR file”,然后,点击“Next>”按钮,弹出如下所示界面。

实验二详细 - 图11

在该界面中,“Launch configuration”用于设置生成的JAR包被部署启动时运行的主类,需要在下拉列表中选择刚才配置的类“WordCountTest-MyWordCount”。在“Export destination”中需要设置JAR包要输出保存到哪个目录,比如,这里设置为“/home/hfut/hadoop-3.2.2/myapp/WordCountTest.jar”。在“Library handling”下面选择“Extract required libraries into generated JAR”。然后,点击“Finish”按钮,会出现如下图所示界面。

实验二详细 - 图12

可以忽略该界面的信息,直接点击界面右下角的“OK”按钮,启动打包过程。打包过程结束后,会出现一个警告信息界面,如下图所示。

实验二详细 - 图13

可以忽略该界面的信息,直接点击界面右下角的“OK”按钮。至此,已经顺利把MyWordCount工程打包生成了WordCountTest.jar。可以到Linux系统中查看一下生成的WordCountTest.jar文件,可以在Linux的终端中执行如下命令:

[hfut@master ~]$ ls ~/hadoop-3.2.2/myapp/

实验二详细 - 图14

可以看到,“~/hadoop-3.2.2/myapp/”目录下已经存在一个WordCountTest.jar文件。现在,就可以在Linux系统中,使用hadoop jar命令运行程序,命令如下:

[hfut@master ~]$ hadoop jar ~/hadoop-3.2.2/myapp/MyWordCount.jar hdfs://master:9000/user/hfut/input hdfs://master:9000/user/hfut/output

实验二详细 - 图15

由于文件已经存在,所以报错了

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/user/hfut/output already exists

删除hadoop 中的/uesr/hfut/output后运行

实验二详细 - 图16

查看运行结果

可以读取文件

[hfut@master ~]$ hadoop fs -ls /user/hfut/output

实验二详细 - 图17

[hfut@master ~]$ hadoop fs -cat /user/hfut/output/part-r-00000

实验二详细 - 图18

文件下载到本地

[hfut@master ~]$ hadoop fs -get /user/hfut/output/part-r-00000 /home/hfut/Downloads

实验二详细 - 图19

本地查看

[hfut@master ~]$ cd
[hfut@master ~]$ cd Downloads
[hfut@master ~]$ ls
[hfut@master ~]$ cat part-r-00000

实验二详细 - 图20

key和value

inputFormat将hdfs上要处理的文件一行一行的读入,将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数

这里是把每个文件按行处理,下图有两个文件,每个文件有两行,每一行的开头字符所在位置的偏移量 ,第一行的开头偏移量自然是0,hello world共10个偏移量,加上中间的空格11个偏移量,回车再算一个,第二行的开头偏移量是12.

实验二详细 - 图21

2)将分割好的对交给用户定义的map方法进行处理,生成新的对,如图4-2所示。

这里是用户自定义的map处理程序,每一行的字符按“ ” 分割,分割的每一个元素都记为1,也就是map节点的所有value都是1

实验二详细 - 图22

3)得到map方法输出的对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。

实验二详细 - 图23

4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的对,并作为WordCount的输出结果,

实验二详细 - 图24