1. MapTask 运行机制详解

  • MapTask 流程
    • image.png
  • 详细步骤

      1. 首先,读取数据组件 InputFormat(默认 TextInputFormat)会通过 getSplits 方法对输入目录中文件进行 逻辑切片(区别与 HDFS 的物理分块),规划得到 splits,有多少个split 就对应启动多少MapTask。
        • split 与 block 的对应关系默认是一对一split 和 block 的默认大小都是 128M
      1. 将输入文件切分为 splits 之后,由 RecordReader 对象(默认 LineRecordReader)进行读取,以 \n 作为分隔符,读取一行行数据,返回
        • Key 表示每行首字符偏移值,value 表示这一行文本内容
      1. 读取 split 返回,进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数。RecordReader 读取一行这里 map 调用一次。
      1. map 逻辑完成之后,将 map 的每条结果通过 context.write 进行 collect 数据收集。在 collect 中,会先对其进行 分区处理,默认使用 HashPartitioner
        • MapReduce 提供 Partitioner 接口, 它的作用就是根据 key 或 value 及 reduce 的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对 key hash 后再以reduce task数量量取模(分区策略是k的hashCode值%reduceTask的数量)默认的取模方式只是为了平均 reduce 的处理能力,如果用户自己对 Partitioner 有需求,可以订制并设置到 job 上。
      1. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区缓冲区的作用是批量收集map结果,减少磁盘 IO 的影响。我们的 key/value对 以及 Partition 的结果都会被写入缓冲区。当然在写入之前,key 与 value 值都会被序列化成字节数组
        • 进入缓冲区时 要对 key 进行排序(快速排序,QuickSort)
        • 环形缓冲区其实是一个数组(环形数组),数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
        • 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中⽂文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例 spillpercent。这个比例默认是80%,也就是当缓冲区的数据已经达到阈值(buffer size spillpercent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响(溢写:目的是让缓冲区中的读和写互不干扰
      1. 当溢写线程启动后,需要对这80MB空间内的key做 **排序**(Sort)这里的排序用的是**归并排序(MergeSort)排序是MapReduce模型默认的行为**
        • 如果 job 设置过 Combiner(预聚合),那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用
        • 哪些场景才能使用 Combiner 呢?从这里分析,Combiner 的输出是 Reducer 的输入,Combiner 绝不能改变最终的计算结果Combiner只应该用于那种Reduce的输入key/value与输出key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重, 如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果
      1. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件写之前判断是否有 combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行 merge 合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件以记录每个reduce对应数据的偏移量
  • 至此,map 整个阶段结束

  • MapTask 的一些配置

  • 即 maptask 的数量,等于 切片split 的数量,也等于 hdfs 分块的数量

  • MapTask并行度思考
    • MapTask 的并行度决定 Map 阶段的任务处理并发度,从而影响到整个 Job 的处理速度。
    • 思考:MapTask并行任务是否越多越好呢?哪些因素影响了了MapTask并行度?
  • MapTask 并行度决定机制
    • 数据块:Block 是 HDFS 物理上把数据分成一块一块
    • 切片:数据切片split 只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储

image.png

  • 切片split 的计算方式
    • image.png
  • 思想:移动计算比移动数据成本更低
    • 移动计算,即 将计算包发送至目的地,比发送数据更划算

切片机制 源码阅读

  • 经过源码分析,得知 切片的默认大小是 128M

image.png

  • 问题:切片(多线程数量)是不是越多越好呢?
    • 答:不是,线程开得越多,消耗的资源越多,因此线程的数量要有个度。比如一个129M的文件,会切成一片,而不是两片(如果切成两片,第二片是1M,开的线程消耗资源更多,得不偿失)
    • 当一个文件占据了多个block块,并且这个文件的总大小 < blocksize * 1.1 时,会进入一个分片。
  • 思考题:对于129M的文件,如果切片为 1,那么分块 block 为多少?
    • 2个

3. ReduceTask 工作机制

image.png

  • Reduce 大致分为 copysortreduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger onDiskMerger,分别 将内存中的数据 merge 到磁盘(inMemoryMerger) 和 将磁盘中的数据进行 merge(onDiskMerger)。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用户定义的 reduce 函数进行处理

    详细步骤

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过 HTTP 方式请求 maptask 获取属于自己的文件。

  • Merge阶段。这里的 merge 如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge 有三种形式:内存到内存;内存到磁盘;磁盘到磁盘默认情况下第一种形式不启用(内存到内存)。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有 Combiner,也是会启用的,然后在磁盘中生成了了众多的溢写文件。第二种merge方式一直在运行,直到没有 map 端的数据时才结束,然后启动第三种磁盘到磁盘的merge 方式生成最终的文件。
  • 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
  • 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中

    4. ReduceTask 并行度

  • ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置

    image.png

    注意事项

    1. ReduceTask = 0,表示没有 Reduce 阶段,输出文件数和 MapTask 数量保持一致
    2. ReduceTask 数量不设置默认就是一个,输出文件数量为1个
    3. 如果数据分布不均匀,可能在 Reduce 阶段产生倾斜(有些reduceTask处理数据很多,有些则很少)

      5. Shuffle 机制

  • map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle

  • shuffle:洗牌、发牌 —>(核心机制:数据分区,排序,分组,combine,合并 等过程)


    image.png

    MapReduce分区与reduceTask数量

  • 在 MapReduce 中,通过我们指定分区,会将同一个分区的数据发送到同一个 reduce 当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个 reduce 当中去,在同一个 reduce 当中统计相同类型的数据

  • 如何才能保证相同 key 的数据去往同一个 reduce 呢?
    • 只需要保证相同 key 的数据分发到同个分区即可。结合以上原理分析我们知道 MR 程序 shuffle 机制默认 就是这种规则!!!

分区源码

  • 翻阅源码验证以上规则,MR程序默认使用的 HashPartitioner,保证了相同的 key 去往同个分区

    • image.png

      自定义分区

  • 实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以及分区数量

  • 具体步骤
      1. 自定义类继承 Partitioner,重写 getPartition() 方法
      1. 在 Driver 驱动中,指定使用自定义 Partitioner
      1. 在 Driver 驱动中,要根据自定义 Partitioner 的逻辑设置相应数量的 ReduceTask 数量

需求:按照不同的 appkey 把记录输出到不同的分区中

  • 原始日志格式
    • MapReduce 原理分析(上) - 图10
  • 输出结果
    • 根据 appkey 把不同厂商的日志数据分别输出到不同的文件中

需求分析

  • 面对业务需求,结合 mr 的特点,来设计 map 输出的 kv,以及 reduce 输出的 kv 数据
  • 一个 ReduceTask 对应 一个输出文件因为在 shuffle 机制中每个 reduceTask 拉取的都是某一个分区的数据,**一个分区对应一个输出文件**
  • 结合 appkey 的前缀相同的特点,同时不使用默认分区规则,而是使用自定义分区器,只要 appkey 前缀相同则数据进入同一个分区

整体思路

  • Mapper
      1. 读取一行文本,按照制表符 \t 切分
      1. 解析出 appkey 字段,其余数据封装为 PartitionBean 对象(实现序列化 Writable 接口)
      1. 设计 map( ) 输出的 kv,key-->appkey(依靠该字段完成分区),PartitionBean 对象作为 Value 输出
  • Partition
    • 自定义分区器,实现按照 appkey 字段的前缀来区分所属分区
  • Reducer
    • reduce() 正常输出即可,无需进行聚合操作
  • Driver
      1. 在原先设置 job 属性的同时增加设置:使用自定义分区器
    • 2. 注意设置 ReduceTask 的数量(与分区数量保持一致)

Mapper

  1. package com.lagou.partition;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
  7. final PartitionBean bean = new PartitionBean();
  8. @Override
  9. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10. String[] strings = value.toString().split("\t");
  11. String appKey = strings[2];
  12. bean.setId(strings[0]);
  13. bean.setDeviceId(strings[1]);
  14. bean.setAppKey(strings[2]);
  15. bean.setIp(strings[3]);
  16. bean.setSelfDuration(Long.parseLong(strings[4]));
  17. bean.setThirdPartDuration(Long.parseLong(strings[5]));
  18. bean.setStatus(strings[6]);
  19. context.write(new Text(appKey), bean);
  20. }
  21. }

PartitionBean

  1. package com.lagou.partition;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class PartitionBean implements Writable {
  7. private String id;
  8. private String deviceId;
  9. private String appKey;
  10. private String ip;
  11. private Long selfDuration;
  12. private Long thirdPartDuration;
  13. private String status;
  14. public PartitionBean() {
  15. }
  16. public PartitionBean(String id, String deviceId, String appKey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
  17. this.id = id;
  18. this.deviceId = deviceId;
  19. this.appKey = appKey;
  20. this.ip = ip;
  21. this.selfDuration = selfDuration;
  22. this.thirdPartDuration = thirdPartDuration;
  23. this.status = status;
  24. }
  25. @Override
  26. public void write(DataOutput out) throws IOException {
  27. out.writeUTF(id);
  28. out.writeUTF(deviceId);
  29. out.writeUTF(appKey);
  30. out.writeUTF(ip);
  31. out.writeLong(selfDuration);
  32. out.writeLong(thirdPartDuration);
  33. out.writeUTF(status);
  34. }
  35. @Override
  36. public void readFields(DataInput in) throws IOException {
  37. this.id = in.readUTF();
  38. this.deviceId = in.readUTF();
  39. this.appKey = in.readUTF();
  40. this.ip = in.readUTF();
  41. this.selfDuration = in.readLong();
  42. this.thirdPartDuration = in.readLong();
  43. this.status = in.readUTF();
  44. }
  45. public String getId() {
  46. return id;
  47. }
  48. public void setId(String id) {
  49. this.id = id;
  50. }
  51. public String getDeviceId() {
  52. return deviceId;
  53. }
  54. public void setDeviceId(String deviceId) {
  55. this.deviceId = deviceId;
  56. }
  57. public String getAppKey() {
  58. return appKey;
  59. }
  60. public void setAppKey(String appKey) {
  61. this.appKey = appKey;
  62. }
  63. public String getIp() {
  64. return ip;
  65. }
  66. public void setIp(String ip) {
  67. this.ip = ip;
  68. }
  69. public Long getSelfDuration() {
  70. return selfDuration;
  71. }
  72. public void setSelfDuration(Long selfDuration) {
  73. this.selfDuration = selfDuration;
  74. }
  75. public Long getThirdPartDuration() {
  76. return thirdPartDuration;
  77. }
  78. public void setThirdPartDuration(Long thirdPartDuration) {
  79. this.thirdPartDuration = thirdPartDuration;
  80. }
  81. public String getStatus() {
  82. return status;
  83. }
  84. public void setStatus(String status) {
  85. this.status = status;
  86. }
  87. @Override
  88. public String toString() {
  89. return id +
  90. "\t" + deviceId +
  91. "\t" + appKey +
  92. "\t" + ip +
  93. "\t" + selfDuration +
  94. "\t" + thirdPartDuration +
  95. "\t" + status;
  96. }
  97. }

CustomPartitioner

package com.lagou.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartition extends Partitioner<Text, PartitionBean> {

    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) {

        int partition = 0;

        if (text.toString().equals("kar")) {
            partition = 0;
        } else if (text.toString().equals("pandora")) {
            partition = 1;
        } else {
            partition = 2;
        }
        return partition;
    }
}

PartitionReducer

package com.lagou.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {

    @Override
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
        for (PartitionBean bean : values) {
            context.write(key, bean);
        }
    }
}

PartitionDriver

package com.lagou.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance();

        job.setJarByClass(PartitionDriver.class);
        job.setMapperClass(PartitionMapper.class);
        job.setReducerClass(PartitionReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PartitionBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PartitionBean.class);

        job.setPartitionerClass(CustomPartition.class);
        job.setNumReduceTasks(3);

        FileInputFormat.setInputPaths(job, new Path("D:\\partition\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\partition\\output"));

        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }
}

总结

  1. 自定义分区器时 最好**保证 分区数量 与 reduceTask 数量保持一致!!!**
  2. 如果分区数量不止 1 个,但是 reduceTask 数量 1 个,此时只会输出一个文件
  3. 如果 reduceTask 数量 大于分区数量,但是输出多个空文件
  4. 如果 reduceTask 数量 小于分区数量,有可能会报错
    • image.png

      MapReduce 中的 Combiner

  • combiner 运行机制
    • image.png
  1. Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
  2. Combiner 组件的父类就是 Reducer
  3. Combiner 和 reducer 的区别在于运行的位置(两者运行机制一样,程序可以互换)
  4. Combiner 是在每一个 maptask 所在的节点运行
  5. Combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
  6. Combiner 能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来

    • image.png

      自定义Combiner 实现步骤

    • 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

    • 在驱动 (Driver) 设置使用 Combiner(默认是不使用 Combiner 组件)

1.改造WordCount程序

package com.lagou.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int num = 0;

        for (IntWritable value : values) {
            num += value.get();
        }
        context.write(key, new IntWritable(num));
    }
}

2.在驱动(Driver)设置使用Combiner

  • job.setCombinerClass(WordcountCombiner.class);

image.png

  • 思考题:如果直接使用 WordCountReducer 作为 Combiner 使用是否可以?
  • 答:**可以**!因为实现的逻辑是一模一样的