MapTask运行机制详解

MapReduce原理解析 - 图1

步骤流程详解:

  1. ⾸先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits⽅法对输⼊⽬录中⽂
    件进⾏逻辑切⽚规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关
    系默认是⼀对⼀。
  2. 将输⼊⽂件切分为splits之后,由RecordReader对象(默认LineRecordReader)进⾏读取,以\n
    作为分隔符,读取⼀⾏数据,返回。Key表示每⾏⾸字符偏移值,value表示这⼀⾏
    ⽂本内容。
  3. 读取split返回,进⼊⽤户⾃⼰继承的Mapper类中,执⾏⽤户重写的map函数。
    RecordReader读取⼀⾏这⾥调⽤⼀次。
  4. map逻辑完之后,将map的每条结果通过context.write进⾏collect数据收集。在collect中,会先
    对其进⾏分区处理,默认使⽤HashPartitioner。

MapReduce提供Partitioner接⼝,它的作⽤就是根据key或value及reduce的数量来决定当前的这对
输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的
取模⽅式只是为了平均reduce的处理能⼒,如果⽤户⾃⼰对Partitioner有需求,可以订制并设置到
job上。

  1. 接下来,会将数据写⼊内存,内存中这⽚区域叫做环形缓冲区,缓冲区的作⽤是批量收集map结
    果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写⼊缓冲区。当然写⼊之
    前,key与value值都会被序列化成字节数组。
  • 环形缓冲区其实是⼀个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,
    包括partition、key的起始位置、value的起始位置以及value的⻓度。环形结构是⼀个抽象概念。
  • 缓冲区是有⼤⼩限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以
    需要在⼀定条件下将缓冲区中的数据临时写⼊磁盘,然后重新利⽤这块缓冲区。这个从内存往磁盘
    写数据的过程被称为Spill,中⽂可译为溢写。这个溢写是由单ᇿ线程来完成,不影响往缓冲区写
    map结果的线程。溢写线程启动时不应该阻⽌map的结果输出,所以整个缓冲区有个溢写的⽐例
    spill.percent。这个⽐例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill
    percent = 100MB
    0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执⾏溢写过程。Map
    task的输出结果还可以往剩下的20MB内存中写,互不影响。
  1. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的⾏为!
  • 如果job设置过Combiner,那么现在就是使⽤Combiner的时候了。将有相同key的key/value对的
    value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整
    个模型中会多次使⽤。
  • 那哪些场景才能使⽤Combiner呢?从这⾥分析,Combiner的输出是Reducer的输⼊,Combiner
    绝不能改变最终的计算结果。Combiner只应该⽤于那种Reduce的输⼊key/value与输出key/value
    类型完全⼀致,且不影响最终结果的场景。⽐如累加,最⼤值等。Combiner的使⽤⼀定得慎重,
    如果⽤好,它对job执⾏效率有帮助,反之会影响reduce的最终结果。
  1. 合并溢写⽂件:每次溢写会在磁盘上⽣成⼀个临时⽂件(写之前判断是否有combiner),如果
    map的输出结果真的很⼤,有多次这样的溢写发⽣,磁盘上相应的就会有多个临时⽂件存在。当
    整个数据处理结束之后开始对磁盘中的临时⽂件进⾏merge合并,因为最终的⽂件只有⼀个,写⼊
    磁盘,并且为这个⽂件提供了⼀个索引⽂件,以记录每个reduce对应数据的偏移量。
    ⾄此map整个阶段结束!!

MapTask并行度

  • MapTask的并行度和map的数据切片数量有关,一个切片对应一个MapTask;
  • 数据切片和分块Block不一样,数据切片是逻辑切分数据,而Block是磁盘上数据的物理切分。

ReduceTask工作机制

MapReduce原理解析 - 图2
Reduce⼤致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含⼀个
eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线
程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据
进⾏merge。待数据copy完成之后,copy阶段就完成了,开始进⾏sort阶段,sort阶段主要是执⾏
finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调⽤⽤户定义的reduce函数进⾏处理。

详细步骤

  • Copy阶段,简单地拉取数据。Reduce进程启动⼀些数据copy线程(Fetcher),通过HTTP⽅式请求
    maptask获取属于⾃⼰的⽂件。
  • Merge阶段。这⾥的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数
    值。Copy过来的数据会先放⼊内存缓冲区中,这⾥的缓冲区⼤⼩要⽐map端的更为灵活。merge
    有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第⼀种形式不启⽤。当内存中的
    数据量到达⼀定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过
    程中如果你设置有Combiner,也是会启⽤的,然后在磁盘中⽣成了众多的溢写⽂件。第⼆种
    merge⽅式⼀直在运⾏,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge
    ⽅式⽣成最终的⽂件。
  • 合并排序。把分散的数据合并成⼀个⼤的数据后,还会再对合并后的数据排序。
  • 对排序后的键值对调⽤reduce⽅法,键相等的键值对调⽤⼀次reduce⽅法,每次调⽤会产⽣零个
    或者多个键值对,最后把这些输出的键值对写⼊到HDFS⽂件中。

ReduceTask并行度

ReduceTask的并⾏度同样影响整个Job的执⾏并发度和执⾏效率,但与MapTask的并发数由切⽚数决定
不同,ReduceTask数量的决定是可以直接⼿动设置:

  1. // 默认值是1,⼿动设置为4
  2. job.setNumReduceTasks(4);

注意事项

  1. ReduceTask=0,表示没有Reduce阶段,输出⽂件数和MapTask数量保持⼀致;
  2. ReduceTask数量不设置默认就是⼀个,输出⽂件数量为1个;
  3. 如果数据分布不均匀,可能在Reduce阶段产⽣倾斜;