转自:https://blog.csdn.net/weixin_44591209/article/details/88049264
MapReduce源于Google一篇论文,它充分借鉴了“分而治之”的思想,将一个数据处理过程拆分为主要的Map(映射)与Reduce(归约)两步。简单地说,MapReduce就是”任务的分解与结果的汇总”。
MapReduce (MR) 是一个基于磁盘运算的框架,贼慢,慢的主要原因:1)MR是进程级别的,一个MR任务会创建多个进程(map task和reduce task都是进程),进程的创建和销毁等过程需要耗很多的时间。 2)磁盘I/O问题, MapReduce作业通常都是数据密集型作业,大量的中间结果需要写到磁盘上并通过网络进行传输,这耗去了大量的时间。
注:mapreduce 1.x架构有两个进程:JobTracker :负责资源管理和作业调度。TaskTrachker:任务的执行者。运行 map task 和 reduce task。在2.x的时候由yarn取代他们的工作了。

MapReduce工作流程

input.txt—>InputFormat—>Map阶段—>shuffle阶段(横跨Mapper和Reducer,在Mapper输出数据之前和Reducer接收数据之后都有进行)—>Reduce阶段 —>OutputFormat —>HDFS:output.txt
InputFormat接口:将我们的输入数据进行分片(split),输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。输入分片的大小一般和hdfs的blocksize相同(128M),可以改,但最好不要。
Map阶段: Map会读取输入分片数据,一个输入分片(input split)针对一个map任务,进行map逻辑处理(用户自定义)
Reduce阶段:对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。

MapReduce Shffle详解(2.x)

1.15 hadoop之MapReduce生产调优配置详解【转】 - 图1image.gif
为了确保每个reducer的的输入都是按键排序的,系统执行排序的过程,即将map task的输出通过一定规则传给reduce task,这个过程成为shuffle。
Shuffle阶段一部分是在map task 中进行的, 这里成为Map shuffle , 还有一部分是在reduce task 中进行的, 这里称为Reduce shffle。

Map Shuffle 阶段

  1. Map在做输出时候会在内存里开启一个环形缓冲区,默认大小是100M(参数:mapreduce.task.io.sort.mb),Map中的outputCollect会把输出的所有kv对收集起来,存到这个环形缓冲区中。<br /> **环形缓冲区**:本质上是一个首尾相连的数组,这个数组会被一分为二,一边用来写索引,一边用来写数据。一旦这个环形缓冲区中的内容达到阈值(默认是0.8,参数:mapreduce.map.sort.spill.percent),一个后台线程就会把内容溢写(spill)到磁盘上,在这过程中,map输出并不会停止往缓冲区写入数据(反向写,到达阈值后,再反向,以此类推),但如果在此期间缓冲区被写满,map会被阻塞直到写磁盘过程完成。溢写过程按照轮询方式将缓冲区的内容写到mapred.local.dir指定的作业特定子目录中的目录中,map任务结束后删除。<br />![](https://cdn.nlark.com/yuque/0/2020/png/1025762/1607519504011-cabcbd88-4612-4b2f-997b-16803cffe801.png#align=left&display=inline&height=368&margin=%5Bobject%20Object%5D&originHeight=368&originWidth=978&size=0&status=done&style=none&width=978)![image.gif](https://cdn.nlark.com/yuque/0/2020/gif/1025762/1607519503873-549179dc-c607-48c9-beb2-e67e20566354.gif#align=left&display=inline&height=1&margin=%5Bobject%20Object%5D&name=image.gif&originHeight=1&originWidth=1&size=43&status=done&style=none&width=1)<br /> 先介绍两个概念:<br />** Combiner**: 本地的reducer,运行combiner使得map输出结果更紧凑,可以减少写到磁盘的数据和传递给reducer的数据。可通过编程自定义(没有定义默认没有)。适用场景:求和、次数等 (做 ‘’+‘’ 法的场景) 【如平均数等场景不适合用】。<br /> **Partitioner**:分区,按照一定规则,把数据分成不同的区,Partitioner决定map task输出的数据交由哪个reduce task处理,一个partition对应一个reduce task。 分区规则可通过编程自定义,默认是按照key的hashcode进行分区。<br /> 在数据溢写到磁盘之前,线程首先根据partitioner将数据划成相应的分区,然后在每个分区中按键进行区内排序。如果设置了combiner,它就在排序后的输出上运行。所以每次溢写到磁盘上的数据应该是 分区且区内有序的。<br /> 每次溢写会生成一个溢写文件(spill file),因此在map任务写完其最后一个输出记录之后,会有多个溢写文件。在Map 任务完成前,所有的spill file将会进行归并排序为一个分区且有序的文件。这是一个多路归并过程,最大归并路数由默认是10(参数:[mapreduce.task.io.sort.factor]())。如果有定义combiner,且至少存在3个(参数:mapreduce.map.combine.minspills )溢出文件时,则combiner就会在输出文件写到磁盘之前再次运行。<br /> 在将压缩map输出写到磁盘的过程中对它进行压缩加快写磁盘的速度、更加节约时间、减少传给reducer的数据量。将[mapreduce.output.fileoutputformat.compress]()设置为true(默认为false),就可以启用这个功能。使用的压缩库由参数[mapreduce.output.fileoutputformat.compress.codec]()指定。<br /> 当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,通知appmaster, map task已经完成。<br />

Reduce Shuffle 阶段

  1. Reducer是通过HTTP的方式得到输出文件的分区。使用netty进行数据传输,默认情况下netty的工作线程数是处理器数的2倍。一个reduce task 对应一个分区。<br /> reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问appmaster 关于map的输出。appmaster是知道map的输出和host之间的关系。在reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问master 关于map的输出。Reduce并不会在获取到map输出之后就立即删除hosts,因为reduce有可能运行失败。相反,是等待appmaster的删除消息来决定删除host。<br /> map任务的完成数占总map任务的0.05(参数:mapreduce.job.reduce.slowstart.completedmaps),reduce任务就开始复制它的输出,复制阶段把Map输出复制到Reducer的内存或磁盘。复制线程的数量由mapreduce.reduce.shuffle.parallelcopies参数来决定,默认是 5。<br /> 如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比,默认为0.7),如果缓冲区空间不足,map输出会被复制到磁盘。一旦内存缓冲区达到阈值(参数:mapreduce.reduce.shuffle.merge.percent,默认0.66)或达到map的输出阈值(参数:[mapreduce.reduce.merge.inmem.threshold](),默认1000)则合并后溢写到磁盘中。如果指定combiner,则在合并期间运行它已降低写入磁盘的数据量。随着磁盘上副本的增多,后台线程会将它们合并为更大的,排序好的文件。注:为了合并,压缩的map输出都必须在内存中解压缩。<br /> 复制完所有的map输出后,reduce任务进入归并排序阶段,这个阶段将合并map的输出,维持其顺序排序。这是循环进行的。目标是合并最小数据量的文件以便最后一趟刚好满足合并系数(参数:[mapreduce.task.io.sort.factor](),默认10)。<br /> 因此,如果有40个文件(包括磁盘和内存),不会在四趟中每趟合并10个文件而得到4个文件,再将4个文件合并到reduce。而是第一趟只合并4个文件,随后的三塘合并10个文件。最后一趟中,4个已经合并的文件和剩余的6个文件合计十个文件直接合并到reduce。<br /> 这并没有改变合并的次数,它只是一个优化措施,尽量减少写到磁盘的数据量。因为最后一趟总是直接合并到reduce,没有磁盘往返。<br /> 至此,Shuffle阶段结束。

Shuffle总结

  1. 1map task收集map()方法输出的kv对,放到内存环形缓冲区中<br /> 2)从内存环形缓冲区不断将文件经过分区、排序、combine(可选)溢写(spill)到本地磁盘<br /> 3)多个溢出文件会归并排序成大的spill file<br /> 4reduce task根据自己的分区号,去各个map task机器上取相应的结果分区数据<br /> 5reduce task会取到同一个分区的来自不同maptask的结果文件,reduce task会将这些文件再进行归并排序<br /> 6)合并成大文件后,shuffle过程结束

MapReduce 调优

输入阶段:
处理小文件问题:
Map阶段:
1)减少溢写(spill)次数。
2)减少合并(merge)次数。
3)不影响业务逻辑前提下,设置combine。
4)启用压缩。
Reduce阶段:
1)合理设置map和reduce数。
2)合理设置map、reduce共存。
3)规避使用reduce:因为reduce在用于连接数据集的时候将会产生大量的网络消耗。
4)合理设置reduce端的buffer:默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销。

配置参数 参数说明
mapreduce.task.io.sort.mb shuffle的环形缓冲区大小,默认100M。如果能估算map输出大小,就可以合理设置该值来尽可能减少溢出写的次数,这对调优很有帮助。
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认0.8。
mapreduce.task.io.sort.factor 归并因子,默认为10。 一般调高,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。将此值增加到100是很常见的。
mapreduce.map.combine.minspills 默认为3。运行combiner所需的最少溢出写文件数(如果已指定combiner)
mapreduce.output.fileoutputformat.compress map输出是否压缩,默认为false。如果map输出的数据量非常大,那么在写入磁盘时压缩数据往往是个很好的主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。
mapreduce.output.fileoutputformat.compress.codec 用于map输出的压缩编解码器,默认为org.apache.Hadoop.io.compress.DefaultCodec。推荐设置为LZO压缩。
mapreduce.job.reduce.slowstart.completedmaps 调用reduce之前,map必须完成的最少比例,默认为0.05。
mapreduce.reduce.shuffle.parallelcopies reducer在复制阶段复制线程的数量,默认为5。
mapreduce.reduce.shuffle.input.buffer.percent 在shuffle的复制阶段,分配给map输出的缓冲区占JVM堆空间的百分比,默认为0.7。
mapreduce.reduce.shuffle.merge.percent reduce输入缓冲区溢写的阈值。默认是0.66。
mapreduce.reduce.merge.inmem.threshold reduce输入缓存区的文件数阈值。默认是1000。0或者小于0意味着此参数不生效。
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

MapReduce 常用命令

  1. [hadoop@hadoop002 bin]$ mapred --help
  2. Usage: mapred [--config confdir] COMMAND
  3. where COMMAND is one of:
  4. pipes run a Pipes job
  5. job manipulate MapReduce jobs
  6. queue get information regarding JobQueues
  7. classpath prints the class path needed for running
  8. mapreduce subcommands
  9. historyserver run job history servers as a standalone daemon
  10. distcp <srcurl> <desturl> copy file or directories recursively
  11. archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive
  12. archive-logs combine aggregated logs into hadoop archives
  13. hsadmin job history server admin interface

image.gif
mapred job -list : 查看当前运行的job
mapred job -kill jobId : 杀掉某个job
mapred job -kill-task taskid : 杀掉某个task