1.MapReduce思想

MapReduce的思想核心是 : 先分再合,分而治之

分而治之:就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果, 然后把各部分的结果组成整个问题的最终结果。

MapReduce其实是两个东西,是有map和reduce组成的:

Map表示第一阶段,负责“拆分”:即把复杂的任务分解为若干个“简单的子任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce表示第二阶段,负责“合并”:即对map阶段的结果进行全局汇总。

这两个阶段合起来正是MapReduce思想的体现。

MapReduce - 图1

但是,如果这个问题拆不动或者拆分之后是有依赖关系的如何去处理?还能用分而治之的思想吗?

答案是不行,所以上述说的分而治之的思想必须是有一个前提的:

如果想在处理事情的时候采用分而治之的思想,必须要判断这个东西能不能拆分,拆分之后会不会有依赖关系,如果不满足还是按部就班依次去处理。

因为我们拆分的目的就是拆分成小问题,并行去处理问题。

也可以用下面的例子去理解分而治之:(我要数一下停车场有多少台车)

MapReduce - 图2

map阶段:每一个人数一列汽车数量

reduce阶段:每个人输完汽车数量的总数进行累加计算

2.简介

Hadoop MapReduce是一个分布式计算框架,用于轻松编写分布式应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)

特点:

1.易于编程

Mapreduce框架提供了用于二次开发的接口;简单地实现一些接口,就可以完成一个分布式程序

任务计算交给计算框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。

2.良好的扩展性

当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力

基于MapReduce的分布式计算得特点可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。

3.高容错性

Hadoop集群是分布式搭建和部署得,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行,不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。

4.适合海量数据的离线处理

可以处理GB、TB和PB级别得数据量

局限性:

MapReduce虽然有很多的优势,也有相对得局限性, 局限性不代表不能做,而是在有些场景下实现的效果比较差,并不适合用MapReduce来处理,主要表现在以下结果方面:

1.实时计算性能差

MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应

2.不能进行流式计算

流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化得。

实例进程:

一个完整的MapReduce程序在分布式运行时有三类

  • MRAppMaster:负责整个MR程序的过程调度及状态协调(只有一个)
  • MapTask:负责map阶段的整个数据处理流程(多个)
  • ReduceTask:负责reduce阶段的整个数据处理流程(多个)

阶段组成:

  • 一个MapReduce编程模型中只能包含一个Map阶段和一个Reduce阶段,或者只有Map阶段
  • 不能有诸如多个map阶段、多个reduce阶段的情景出现
  • 如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序串行运行

MapReduce - 图3

如果一个map里面写着多个map呢?是可以的,因为上面说的阶段组成只是说一个MapReduce编程模型

数据类型:

  • 整个MapReduce程序中,数据都是以kv键值对的形式流转的
  • 在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么
  • MapReduce内置了很多默认属性,比如排序、分组等,都和数据的k有关,所以说kv的类型数据确定及其重要的

3.实例(如何运行MapReduce程序)

  • 一个最终完整版本的MR程序需要用户编写的代码和Hadoop自己实现的代码整合在一起才可以
  • 其中用户负责map、 reduce两个阶段的业务问题, Hadoop负责底层所有的技术问题
  • 由于MapReduce计算引擎天生的弊端(慢)所以在企业中工作很少涉及到MapReduce直接编程,但是某些软件的背后还依赖MapReduce引擎
  • 可以通过官方提供的示例来感受MapReduce及其内部执行流程,后续的新的计算引擎比如Spark,当中就有MapReduce深深的影子存在
  1. # MapReduce程序所在位置
  2. /export/server/hadoop-3.3.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.0.jar
  3. # MapReduce程序提交命令到YARN集群上分布式执行
  4. [hadoop jar|yarn jar] hadoop-mapreduce-examples-3.3.0.jar args

运行MapReduce程序评估一下圆周率的值,执行中可以去YARN页面上观察程序的执行的情况:

  1. hadoop jar hadoop-mapreduce-examples-3.3.0.jar pi 10 50
  2. # 第一个参数: pi表示MapReduce程序执行圆周率计算任务
  3. # 第二个参数:用于指定map阶段运行的任务task次数, 并发度,这里是10
  4. # 第三个参数:用于指定每个map任务取样的个数,这里是50

下图可以通过可视化界面查看MapReduce执行进程

MapReduce - 图4

MapReduce程序解析:

在开始启动任务的时候,会连接ResourceManager(也就是yarn的主角色,目的是连接yarn进行资源分配)

MapReduce - 图5

4.MapReduce内部运行流程

MapReduce - 图6

以上面图为例子,进行讲解:

  1. map阶段(k-v):将数据进行提交后进行分割(将单词进行分割),分割后每一个单词标记为1,输出的就是k-v格式的(<单词,1>)
  2. shuffle核心阶段:其实map阶段完毕之后并不是reduce,中间还有一个shuffle阶段。那这个阶段的作用是什么呢,经过MR程序内部自带默认的排序(可以按照字典序排序)、分组等功能,把key相同的单词会作为一组数据构成新的k-v键值对。
  3. reduce阶段:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。 并对所有的1进行累加求和,就是单词的总次数。

MapReduce - 图7

1.Map阶段执行流程

MapReduce - 图8

1.把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划

默认Split size = Block size(128M),每一个切片由一个MapTask处理

比如这个文件有300M,那么就会生成3个MapTask来处理

2.对切片中的数据按照一定的规则读取解析返回

默认是按行读取数据。 key是每一行的起始位置偏移量, value是本行的文本内容

MapReduce - 图9

3.调用Mapper类中的map方法处理数据

每读取解析出来的一个 , 调用一次map方法。

MapReduce - 图10

4.按照一定的规则对Map输出的键值对进行分区partition

默认不分区,因为只有一个reducetask。分区的数量就是reducetask运行的数量

5.Map输出数据写入内存缓冲区,达到比例溢出到磁盘上。

溢出spill的时候根据key进行排序sort。默认根据key字典序排序。

也就是多了一层缓冲区,避免频繁的磁盘IO

MapReduce - 图11

也可以理解为:开启一个水龙头,如果一直放水,会对地面产生不小的冲击力,如果我拿一个杯子去接水,杯子满了,我倒在地上,再去接水,再倒在地上,减少了地面的冲击力。

6.对所有溢出文件进行最终的merge合并,成为一个文件

2.Reduce阶段执行流程

MapReduce - 图12

1.ReduceTask会主动从MapTask复制拉取属于需要自己处理的数据

2.把拉取来数据,全部进行合并merge, 即把分散的数据合并成一个大的数据。再对合并后的数据排序

3.第三阶段是对排序后的键值对调用reduce方法。 键相等的键值对调用一次reduce方法(相同的key统计到一起)。 最后把这些输出的键值对写入到HDFS文件中

MapReduce - 图13

3.Shuffle机制

  • Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据
  • 而在MapReduce中, Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理
  • 一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle(是一个过程)

Map端Shuffle

  • Collect阶段:将MapTask的结果收集输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,默认Hash分区
  • Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序
  • Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件

Reducer端shuffle

  • Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据
  • Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作
  • Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可

弊端

  • Shuffle是MapReduce程序的核心与精髓,是MapReduce的灵魂所在
  • Shuffle也是MapReduce被诟病最多的地方所在。 MapReduce相比较于Spark、 Flink计算引擎慢的原因,跟Shuffle机制有很大的关系(因为Shuffle频繁涉及到数据在内存、磁盘之间的多次往复)