前言

shuffle过程中有一个可选的流程Combiner,这个流程是做什么的?
Image 11.png Image 13.png

概述

  • Combiner是MR程序中Mapper和Reducer之外的一种组件
  • Combiner组件的父类就是Reducer
  • Combiner和Reducer功能上是一致的,两者的区别在于运行的位置
    • Combiner是在每一个MapTask所在的节点运行
    • Reducer是接收全局所有Mapper的输出结果
  • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小IO【磁盘IO+网络IO】。
  • Combiner默认是不开启的,能够应用Combiner的前提是不能影响最终的业务逻辑,有些需求是不能用Combiner的比如求平均数,而且Combiner的输出kv应该跟Reducer的输入kv类型要对应起来

    1. ![Image 12.png](https://cdn.nlark.com/yuque/0/2021/png/664972/1620787271915-cfe069f7-f562-467f-ac0b-f78bef4defea.png#clientId=ud9e48620-c0fc-4&from=ui&id=u96195935&name=Image%2012.png&originHeight=93&originWidth=517&originalType=binary&size=9866&status=done&style=none&taskId=u6d013fa4-a429-4f41-8978-39eb7c28c49)
  • Combiner只会出现两次

    • 环形缓冲区中数据溢写到磁盘之前可合并一次
    • 对多个零时文件进行归并排序在落盘之前也可合并一次

      对WordCount案例使用Combiner

      方法一

      1. 增加一个 WordCountCombiner类继承 Reducer

      combiner操作的逻辑和Reducer逻辑是一致的
      public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
         // 1 汇总操作
         int count = 0;
         for(IntWritable v :values){
             count += v.get();
         }
         // 2 写出
         context.write(key, new IntWritable(count));
      }
      }
      

      2. job中设置一下

      // 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
      job.setCombinerClass(WordCountCombiner.class);
      

      方法二

      因为wordcount的Reducer和combiner的逻辑是一致的,所以可以直接将wordcount的Reducer作为combiner即可
      job.setCombinerClass(WordcountReducer.class);