各类异常汇总
https://www.cnblogs.com/spicy/p/9645792.html
https://blog.csdn.net/diyangxia/article/details/78982867
https://www.cnblogs.com/LeachBlog/p/9968052.html

步骤

继承Mapper,编写map的业务逻辑
继承Reducer,编写reduce的业务逻辑
编写Driver驱动类

Mapper类

  1. package com.plat.mapreduce;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  8. * KEYIN - 是指框架读取到的数据的key类型
  9. * 在默认的读取数据组件InputFormat下,读取的key是一行文本的偏移量,所以key的类型是long类型的
  10. * VALUEIN - 是指框架读取到的数据的value类型
  11. * 在默认的读取数据组件InputFormat下,读到的value就是一行文本的内容,所以value的类型是String类型的
  12. * KEYOUT - 是指用户自定义逻辑方法返回的数据中key的类型,这个是由用户业务逻辑决定的
  13. * 在我们单词统计当中,我们输出的是单词数量作为value,所以类型是String
  14. * VALUEOUT - 是指用户自定义逻辑方法返回的数据中value的类型,这个是由用户逻辑决定的
  15. * 在我们做单词统计当中,我们输出的单词数量作为value,所以这个类型是Integer
  16. * String、long是jdk自带的数据类型,序列化效率低,因此hadoop为了提高序列化速度,自己定义了一套数据结构
  17. * String --- Text
  18. * Integer --- IntWritable
  19. * null --- nullWritable
  20. */
  21. // 注意引入mapper的包
  22. public class WordcountMapper extends Mapper {
  23. @Override
  24. protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
  25. //key为每行的起始偏移量
  26. System.out.println("每行的key值 ============ " + key);
  27. //value为每行的数据
  28. System.out.println("每行的value值" + value.toString());
  29. String line = value.toString();
  30. String[] strArr = line.split(" ");
  31. for (String str : strArr) {
  32. context.write(new Text(str), new IntWritable(1));
  33. }
  34. }
  35. }

Reducer类

  1. package com.plat.mapreduce;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * reduceTask在调用我们的reduce方法
  8. * reduceTask应该接收map阶段所有maptask输出的数据中的一部分
  9. * (key.hashcode%numReduceTask==本ReduceTask编号)
  10. * 接收到的所有kv会按照key进行分组,然后将kv传给reduce
  11. *
  12. *
  13. *
  14. */
  15. public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  16. // key为map阶段输出的key
  17. // value为上一阶段相同key输出对应的value集合
  18. @Override
  19. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  20. int count = 0;
  21. for(IntWritable v :values){
  22. count = count + v.get();
  23. }
  24. context.write(key, new IntWritable(count));
  25. }
  26. }

编写Driver驱动类

  1. package com.plat.mapreduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  11. public class WordcountDriver {
  12. public static void main(String[] args) throws Exception{
  13. Configuration configuration = new Configuration();
  14. //如果不做任何配置,默认采用本地文件系统及本地yarn
  15. //采用指定的hdfs文件系统
  16. configuration.set("fs.defaultFS", "hdfs://sunjz-01:9000");
  17. //configuration.set("mapreduce.framework.name", "yarn");
  18. //configuration.set("yarn.resourcemanager.hostname", "sunjz-02");
  19. //configuration.set("mapreduce.app-submission.cross-platform", "true");
  20. Job job = Job.getInstance(configuration);
  21. //告诉程序jar包所在的位置
  22. //job.setJar("/root/wordcount.jar"); //手动写死
  23. job.setJarByClass(WordcountDriver.class); //自动扫描
  24. //告诉框架我们所需的map类和reduce类
  25. job.setMapperClass(WordcountMapper.class);
  26. job.setReducerClass(WordcountReducer.class);
  27. //告诉框架我们程序输出的数据类型
  28. job.setMapOutputKeyClass(Text.class);
  29. job.setMapOutputValueClass(IntWritable.class);
  30. job.setOutputKeyClass(Text.class);
  31. job.setOutputValueClass(IntWritable.class);
  32. //告诉程序我们使用的数据读取组件,结果输出组件是什么
  33. job.setInputFormatClass(TextInputFormat.class);
  34. job.setOutputFormatClass(TextOutputFormat.class);
  35. //告诉框架我们要处理的数据文件放在哪个路径
  36. //FileInputFormat.setInputPaths(job, new Path("D:\\wordcount\\input"));
  37. //FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output"));
  38. FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
  39. FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
  40. boolean flag = job.waitForCompletion(true);
  41. System.exit(flag ? 0 : 1);
  42. }
  43. }