第 1 节 MapReduce思想

MapReduce思想在⽣活中处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核⼼是分而治之,充分利用了并⾏处理的优势。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,⽽不是⾃己原创。
MapReduce任务过程是分为两个处理阶段:

  • Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
  • Reduce阶段:Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。

再次理解MapReduce的思想
image.png

第 2 节 官⽅WordCount案例源码解析

image.png
image.png
image.png
经过查看分析官方WordCount案例源码我们发现⼀个统计单词数量的MapReduce程序的代码由三个部分组成

  • Mapper类
  • Reducer类
  • 运行作业的代码(Driver)

Mapper类继承了org.apache.hadoop.mapreduce.Mapper类重写了其中的map方法,Reducer类继承了org.apache.hadoop.mapreduce.Reducer类重写了其中的reduce⽅法。
重写的Map方法作⽤:map⽅法其中的逻辑就是用户希望mr程序map阶段如何处理的逻辑;
重写的Reduce⽅法作用:reduce方法其中的逻辑是用户希望mr程序reduce阶段如何处理的逻辑;

1. Hadoop序列化

为什么进行序列化?
序列化主要是我们通过⽹络通信传输数据时或者把对象持久化到文件,需要把对象序列化成⼆进制的结构。
观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束,⽐如⾃定义Mapper有四个形参类型,但是形参类型并不是常见的java基本类型。
为什么Hadoop要选择建立⾃己的序列化格式⽽不使⽤java⾃带serializable?

  • 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过RPC(远程过程调用:Remote Procedure Call)实现;RPC将消息序列化成⼆进制流发送到远程节点,远程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往追求如下特点:
    • 紧凑:数据更紧凑,能充分利用⽹络带宽资源
    • 快速:序列化和反序列化的性能开销更低
  • Hadoop使⽤的是⾃己的序列化格式Writable,它比java的序列化serialization更紧凑速度更快。⼀个对象使用Serializable序列化后,会携带很多额外信息⽐如校验信息,Header,继承体系等。

    Java基本类型与Hadoop常用序列化类型

    1. | **Java**基本类型 | **Hadoop Writable**类型 |

    | —- | —- | | boolean | BooleanWritable | | byte | ByteWritable | | int | IntWritable | | float | FloatWritable | | long | LongWritable | | double | DoubleWritable | | String | Text | | map | MapWritable | | array | ArrayWritable |

第 3 节 MapReduce编程规范及示例编写

3.1 Mapper类

  • ⽤户⾃定义⼀个Mapper类继承Hadoop的Mapper类
  • Mapper的输⼊数据是KV对的形式(类型可以⾃定义)
  • Map阶段的业务逻辑定义在map()⽅法中
  • Mapper的输出数据是KV对的形式(类型可以⾃定义)

注意:map()⽅法是对输入的⼀个KV对调⽤一次!!

3.2 Reducer类

  • ⽤户⾃定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输⼊数据类型对应Mapper的输出数据类型(KV对)
  • Reducer的业务逻辑写在reduce()⽅方法中
  • Reduce()⽅法是对相同K的一组KV对调⽤执⾏一次

    3.3 Driver阶段

    创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数输⼊数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作⽤就是提交我们MapReduce程序运行。
    image.png

    3.4 WordCount代码实现

    3.4.1 需求

    在给定的⽂本⽂件中统计输出每⼀个单词出现的总次数
    输入数据:wc.txt;
    输出:

    apache  2
    clickhouse  2
    hadoop  1
    mapreduce 1
    spark 2
    xiaoming  1
    

    3.4.2 具体步骤

    按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。
    1. 新建maven工程

  • 导⼊hadoop依赖

    <dependencies>
      <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
              <version>2.8.2</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.9.2</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.9.2</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.9.2</version>
      </dependency>
    </dependencies>
    <!--maven打包插件 -->
    <build>
      <plugins>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.3.2</version>
          <configuration>
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-assembly-plugin </artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
      </plugins>
    </build>
    

    注意:以上依赖第⼀次需要联网下载!!

  • 添加log4j.properties

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    

    2. 整体思路梳理(仿照源码)
    Map阶段:
    1. map()⽅法中把传入的数据转为String类型
    2. 根据空格切分出单词
    3. 输出<单词,1>
    Reduce阶段:
    1. 汇总各个key(单词)的个数,遍历value数据进行累加
    2. 输出key的总数
    Driver:
    1. 获取配置文件对象,获取job对象实例
    2. 指定程序jar的本地路径
    3. 指定Mapper/Reducer类
    4. 指定Mapper输出的kv数据类型
    5. 指定最终输出的kv数据类型
    6. 指定job处理的原始数据路径
    7. 指定job输出结果路径
    8. 提交作业
    3. 编写Mapper类

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      Text k = new Text();
      IntWritable v = new IntWritable(1);
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
          // 1 获取⼀行
          String line = value.toString();
          // 2 切割
          String[] words = line.split(" ");
          // 3 输出
          for (String word : words) {
              k.set(word);
              context.write(k, v);
          }
      }
    }
    

    继承的Mapper类型选择新版本API:
    image.png
    4. 编写Reducer类

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
      int sum;
      IntWritable v = new IntWritable();
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
          // 1 累加求和
          sum = 0;
          for (IntWritable count : values) {
                sum += count.get();
          }
          // 2 输出
          v.set(sum);
          context.write(key,v);
      }
    }
    

    选择继承的Reducer类
    image.png
    5. 编写Driver驱动类

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WordcountDriver {
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
          // 1 获取配置信息以及封装任务
          Configuration configuration = new Configuration();
          Job job = Job.getInstance(configuration);
          // 2 设置jar加载路径
          job.setJarByClass(WordcountDriver.class);
          // 3 设置map和reduce类
          job.setMapperClass(WordcountMapper.class);
          job.setReducerClass(WordcountReducer.class);
          // 4 设置map输出
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(IntWritable.class);
          // 5 设置最终输出kv类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
          // 6 设置输入和输出路径
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
          // 7 提交
          boolean result = job.waitForCompletion(true);
          System.exit(result ? 0 : 1);
      }
    }
    

    6. 运行任务

  • 本地模式

直接Idea中运行驱动类即可
idea运行需要传入参数:
image.png
运⾏结束,去到输出结果路径查看结果
image.png
注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程方式模拟的mr的运行。

  • Yarn集群模式
    • 把程序打成jar包,改名为wc.jar;上传到Hadoop集群

选择合适的Jar包
image.png
准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行⽆法获取数据!!

  • 启动Hadoop集群(Hdfs,Yarn)
  • 使用Hadoop命令提交任务运行
    hadoop jar  wc.jar com.lagou.wordcount.WordcountDriver /user/lagou/input /user/lagou/output
    
    Yarn集群任务运行成功展示图
    image.png

    第 4 节 序列化Writable接口

    基本序列化类型往不能满足所有需求,⽐如在Hadoop框架内部传递一个自定义bean对象,那么该对象就需要实现Writable序列化接⼝。

    4.1 实现Writable序列化步骤如下

    1. 必须实现Writable接口
      2. 反序列化时,需要反射调⽤空参构造函数,所以必须有空参构造
      public CustomBean() {
      super();
      }
      
    2. 重写序列化方法
      @Override
      public void write(DataOutput out) throws IOException {
      ....
      }
      
    3. 重写反序列化方法
      @Override
      public void readFields(DataInput in) throws IOException {
      ....
      }
      
      5. 反序列化的字段顺序和序列化字段的顺序必须完全一致
      6. ⽅便展示结果数据,需要重写bean对象的toString()⽅法,可以⾃定义分隔符
      7. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!!
      @Override
      public int compareTo(CustomBean o) {
      // ⾃自定义排序规则
      return this.num > o.getNum() ? -1 : 1;
      }
      

      4.2 Writable接⼝案例

      1. 需求

      统计每台智能⾳音箱设备内容播放时⻓长
      原始⽇志格式
      image.png
      输出结果
      image.png

      2. 编写MapReduce程序

    4. 创建SpeakBean对象 ```java

package com.lagou.hdfs; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; // 1 实现writable接口 public class SpeakBean implements Writable { private long selfDuration; private long thirdPartDuration; private long sumDuration; //2 反序列化时,需要反射调⽤空参构造函数,所以必须有 public SpeakBean() {

}

public SpeakBean(long selfDuration, long thirdPartDuration) {
    this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.sumDuration=this.selfDuration+this.thirdPartDuration;
}
//3 写序列化方法
public void write(DataOutput out) throws IOException {
    out.writeLong(selfDuration);
    out.writeLong(thirdPartDuration);
    out.writeLong(sumDuration);
}
//4 反序列化⽅法
//5 反序列化⽅法读顺序必须和写序列化方法的写顺序必须一致
public void readFields(DataInput in) throws IOException {
    this.selfDuration  = in.readLong();
    this.thirdPartDuration = in.readLong();
    this.sumDuration = in.readLong();
}
// 6 编写toString方法,⽅便后续打印到⽂本
@Override
public String toString() {
    return  selfDuration +
            "\t" + thirdPartDuration +
            "\t" + sumDuration ;
}
public long getSelfDuration() {
    return selfDuration;
}
public void setSelfDuration(long selfDuration) {
    this.selfDuration = selfDuration;
}
public long getThirdPartDuration() {
    return thirdPartDuration;
}
public void setThirdPartDuration(long thirdPartDuration) {
    this.thirdPartDuration = thirdPartDuration;
}
public long getSumDuration() {
    return sumDuration;
}
public void setSumDuration(long sumDuration) {
    this.sumDuration = sumDuration;
}
public void set(long selfDuration, long thirdPartDuration) {
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.sumDuration=this.selfDuration+this.thirdPartDuration;
}

}

2. 编写Mapper类
```java

package com.lagou.hdfs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SpeakDurationMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
    SpeakBean v = new SpeakBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出设备id
        String deviceId = fields[1];
        // 取出⾃有和第三方时长数据
        long selfDuration = Long.parseLong(fields[fields.length - 3]);
        long thirdPartDuration = Long.parseLong(fields[fields.length - 2]);
        k.set(deviceId);
        v.set(selfDuration, thirdPartDuration);
        // 4 写出
        context.write(k, v);
    }
}
  1. 编写Reducer ```java

package com.lagou.hdfs; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SpeakDurationReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { long self_Duration = 0; long thirdPart_Duration = 0; // 1 遍历所有bean,将其中的⾃有,第三⽅时⻓分别累加 for (SpeakBean sb : values) { self_Duration += sb.getSelfDuration(); thirdPart_Duration += sb.getThirdPartDuration(); } // 2 封装对象 SpeakBean resultBean = new SpeakBean(self_Duration, thirdPart_Duration); // 3 写出 context.write(key, resultBean); } }

4. 编写驱动
```java

package com.lagou.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SpeakerDriver {
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        // 输⼊输出路径需要根据⾃己电脑上实际的输⼊输出路径设置
        args = new String[] { "e:/input/input", "e:/output1" };
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration); 
        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(SpeakerDriver.class);
        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(SpeakDurationMapper.class);
        job.setReducerClass(SpeakDurationReducer.class);
        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(SpeakBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(SpeakBean.class);
        // 5 指定job的输入原始⽂件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

mr编程技巧总结

  • 结合业务设计Map输出的key和v,利用key相同则去往同一个reduce的特点!!
  • map()⽅法中获取到只是一行⽂本数据尽量不做聚合运算
  • reduce()⽅法的参数要清楚含义

第 5 节 MapReduce原理分析

5.1 MapTask运行机制详解

image.png
详细步骤:

  1. 首先,读取数据组件inputformat(默认TextInputFormat)会通过getSplits方法对输入目录进行逻辑切片得到splits,有多少个split就对应启动多少个mapTask。split与block的对应关系默认一对一
  2. 将输入文件切分为splits后,由RecordReader对象(默认LineRecordReader)进行读取,以 \n 作为分隔符,读取一行数据,返回,key表示每行首字符偏移量,value代表这一行的内容
  3. 读取split返回 ,进入用户自己继承的Mapper类中,执行用户重写的map函数,recordread读取一行这里调用一次
  4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集,在collect中,会先对其进行分区处理,默认使用HashPartitioner
  5. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
    1. 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
    2. 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spillpercent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。
  6. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为
    1. 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用
    2. 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果
  7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对
输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的
取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到
job上
**

5.2 ReduceTask 工作机制

image.png
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

  • Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

  • 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

  • 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

第 6 节 Shuffle阶段

image.png
流程解释:
image.png

6.1 Shuffle机制

Map阶段的数据如何传递给Reduce阶段,是 MapReduce框架中最关键的一个流程——Shufle阶段
Shuffle:洗牌、发牌(核心机制:数据分区、排序、分组、combine、合并等过程)
image.png

以wordcount为例,假设有5个map和3个reduce:

6.2 map阶段

1、在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。
2、在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“hello”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。这个job有3个reduce task,到底当前的“hello”应该交由哪个reduce去做呢,是需要现在决定的。

  • 分区(partition)

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
一个split被分成了3个partition。

  • 排序sort

在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样写入到磁盘的数据量就会减少。

  • 溢写(spill)

Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中, 缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。
这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8。
将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,MapReduce任务结束后就会被删除)。

  • 合并(merge)

每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。这个操作就叫merge(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。一个map最终会溢写一个文件。
至此,Map的shuffle过程就结束了。

6.3 Reduce阶段

Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

  • copy

首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。

  • merge

Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
这里需要强调的是:
merge阶段,也称为sort阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。
merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。
当copy到内存中的数据量到达一定阈值,就启动内存到磁盘的merge,即第二种merge方式,与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束。
然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

  • reduce

不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,参见性能优化篇。
然后就是Reducer执行,在这个过程中产生了最终的输出结果,并将其写到HDFS上。