上一篇文章我们已经对shuffle阶段学习了分区和排序,现在来学习下规约。每一个map都可能善生大量的输出,conbiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高IO性能,是mapReduce的一种优化手段之一。就是会把相同的键值进行合并,这就是所谓的规约

  • combiner是MR程序中Mapper和Reducer之外的一个组件
  • combiner组件的父类就是Reducer
  • combiner和reducer的区别在于运行的位置

    1. combiner是在每一个mapTask所在的节点运行
      2. Reducer是接收全局所有Mapper的输出结果
  • combiner的意义就是对每一个mapTask的输出进行局部汇总,减少网络IO传输

    实现步骤

  1. 自定义一个combiner集成Reducer,重写reduce方法
  2. 在job设置job.setCombinerClass(CustomCombiner.class)

combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv应该跟reducer的输入kv类型要对应起来.

案例

还是拿我们统计单词出现的次数举例:案例示例地址
我们要做的仅仅是在map到reduce阶段增加一个规约处理,以此来减少网络传输量,来提高执行效率。
image.png


代码实现

WordCountMapper 不做改变

  1. package com.dongnaoedu.network.hadoop.mapreduce.combiner;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
  7. @Override
  8. public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
  9. String line = value.toString();
  10. String[] split = line.split(",");
  11. for(String word : split){
  12. context.write(new Text(word), new LongWritable(1));
  13. }
  14. }
  15. }

增加的MyCombiner

  1. package com.dongnaoedu.network.hadoop.mapreduce.combiner;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * @author heian
  8. * @date 2022/3/24 12:25 下午
  9. * @description 规约
  10. */
  11. public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
  12. /**
  13. * 自定义我们的reduce逻辑
  14. * 所有key都是我们的单词,所有的values都是我们单词出现的次数
  15. */
  16. @Override
  17. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{
  18. long count = 0;
  19. for(LongWritable value : values){
  20. count += value.get();
  21. }
  22. context.write(key,new LongWritable(count));
  23. }
  24. }

SortReducer 不做改变

  1. package com.dongnaoedu.network.hadoop.mapreduce.combiner;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class WordCountReduce extends Reducer<Text,LongWritable, Text,LongWritable> {
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{
  9. long count = 0;
  10. // 此时集合经过规约处理,元素只有一个而已
  11. for(LongWritable value : values){
  12. count += value.get();
  13. }
  14. context.write(key,new LongWritable(count));
  15. }
  16. }

设置JobMain的规约类

  1. package com.dongnaoedu.network.hadoop.mapreduce.combiner;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  10. import org.apache.hadoop.util.Tool;
  11. import org.apache.hadoop.util.ToolRunner;
  12. public class JobMain extends Configured implements Tool{
  13. @Override
  14. public int run(String[] strings) throws Exception{
  15. Job job = Job.getInstance(super.getConf(), "mapReduce_combiner");
  16. //打包放在集群下运行,需要做一个配置
  17. job.setJarByClass(JobMain.class);
  18. // 第一步:设置读取文件的类:k1 v1
  19. job.setInputFormatClass(TextInputFormat.class);
  20. TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));
  21. // 第二步:设置mapper类
  22. job.setMapperClass(WordCountMapper.class);
  23. // 设置map阶段输出类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(LongWritable.class);
  26. // 第三 四 五 六步采用默认方式(分区 排序 规约 分组)
  27. job.setCombinerClass(MyCombiner.class);
  28. // 第七步:设置Reduce类
  29. job.setReducerClass(WordCountReduce.class);
  30. job.setOutputKeyClass(Text.class);
  31. job.setOutputValueClass(LongWritable.class);
  32. // 第八步:设置输出类
  33. job.setOutputFormatClass(TextOutputFormat.class);
  34. // 设置输出路径
  35. TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));
  36. boolean b = job.waitForCompletion(true);
  37. return b ? 0:1;
  38. }
  39. public static void main(String[] args) throws Exception{
  40. Configuration configuration = new Configuration();
  41. // 启动一个任务 run=0成功
  42. int run = ToolRunner.run(configuration, new JobMain(), args);
  43. System.exit(run);
  44. }
  45. }