MapReduce思想

MapReduce的思想核心是分而治之,充分利用了并行处理的优势。

  • Map阶段:
    • Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。
  • Reduce阶段:
    • Reduce阶段的主要作用是“合”,即对map阶段的结果进行全局汇总。

image.png

WordCount案例源码解析

image.png
image.png

编程规范及示例编写

Mapper类

  • 用户自定义一个Mapper类继承Hadoop的Mapper类
  • Mapper的输入数据是KV对的形式(类型可以自定义)
  • Map阶段的业务逻辑定义在map()方法中
  • Mapper的输出数据是KV对的形式(类型可以自定义)

注意:map()方法是对输入的一个KV对调用一次!!

Reducer类

  • 用户自定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
  • Reducer的业务逻辑写在reduce()方法中
  • Reduce()方法是对相同K的一组KV对调用执行一次

Driver阶段

创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运行。

Hadoop序列化

序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结构。
Java基本类型与Hadoop常用序列化类型:
image.png

MapReduce原理分析【重要】

MapTask 工作机制

主要步骤(详见讲义):
image.png
注意,上面图的输出只有一个maptask,也就只有一个文件,而reduce端回拉取多个maptask。

  • 首先对输入的数据进行,逻辑切片->splits,splits数量=map task数量。split与bolck默认一对一(128M)。
  • RecordReader对象读取splits,返回
  • 进入,执行用户重写map函数(继承Mapper类)。
  • 将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
  • 将数据写入内存(环形缓冲区),按key分区,并在分区内部排序(快排)。
  • 当达到规定的阈值,开始溢写spill(写入文件),得到多个一样大溢写文件
  • 对多个溢写文件进行分区的合并,并排序
  • 最终将分区好,排序好的文件写入磁盘

MapTask的并行度

split的计算数量 = map task任务开启的数量

数据块 : Block是HDFS物理上把数据分成一块一块。
切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

Q1:MapTask并行度是不是越多越好呢?
答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split????
MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;对于1M的切片的Maptask来说,太浪费资源。

3.1 我们都知道 map 写数据到磁盘之前要先写到一个所谓的环形缓冲区里面,然后分区,排序,再溢写到磁盘,` 设计这个缓冲区的目的是什么?为什么要这么设计?
a.缓冲区是块内存可以让我们高效读写数据;
b.缓冲区是环形设计,可以同时进行写入和写出(溢写)操作,如果直接写磁盘存在阻塞问题

3.2 MR的map阶段完成了几次排序?
两次。
(1)环形缓冲区做的内排序,快速排序
当达到环形缓冲区设定的阀值,在刷写磁盘之前,后台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键key进行内排序(分区内部排序)
(2)溢写文件合并阶段,归并排序
在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序(相当于对两个有序数据进行归并排序),所有合并文件只需要再做一次排序即可使输出文件整体有序。
(3)reduce阶段还会有一次排序。

3.3 整个mapreduce 在哪几个阶段会进行排序?为什么要进行排序,是为了解决什么问题?
a.环形缓冲区溢写文件时进行分区内排序,使用quicksort
b.多个溢写的文件合并会再次进行排序使用归并排序
c.reducetask拉取多个maptask输出结果文件会再次进行归并排序
因为在reduce端需要对数据进行分组,也就是把相同的key放到一起,排序只是为了降低分组时寻找相同key的压力,如果不排序则需要把所有数据都加载到内存,排序之后相同key的数据集中在一起。可以只部分加载内存。

ReduceTask 工作机制

image.png
ReduceTask按照分区拉取,reducetask1拉去蓝色分区的数据,reducetask2拉去黄色分区的数据…..
(默认 是key相同去往同个分区)
Reduce大致分为copy、sort、reduce三个阶段。

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
  • Sort阶段。 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序(同一个key的)。
  • Reduce阶段,对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

    详细过程

    image.png

ReduceTask并行度

  • map task数量由切片数决定
  • reduce task数量可以手动指定

image.png

Shuffle机制 & 分区规则

map()方法之后,reduce方法()之前的数据处理过程成为shuffle。
包含过程:
map端:分区,快速排序,溢写文件合并,归并排序
reduce端:多分区合并,归并排序,分组
image.png
5.1 MapReduce的分区与reduceTask的数量 ?
回答:分区数量 = reduce task的数量,
在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区), 有多少个分区就有多少个reducetask(对应拉取)。

5.2 默认的分区规则?
key的hashcode值 % reducetask的数量(人为指定),也就是说同一个分区的数据,key不一定相同。

5.3 注意reduce阶段归并sort之后的分组操作?
基于同一个分区的有序数据(key的hashcode想同,key不一定相同),按照相同key进行分组

  1. 数据倾斜解决方案 ?

    什么是数据倾斜?

  • 数据倾斜无非就是大量的相同key被partition分配到一个分区里,(导致分区很大)
  • 导致,绝大多数task执行得都非常快,但个别task执行的极慢。甚至失败!

通用解决方案?
对key增加随机数。 这样hash之后,不再会去往同一个分区。
key == hello时,增加随机数,hello1,hello2,hello3就变成了不同的key。

Combiner

运行机制:

  • Combiner是在每一个maptask所在的节点运行;
  • Combiner对每一个maptask的输出进行局部汇总,以减小网络传输量

image.png

【高频面试题】

面试题:
1.我们都知道 map 写数据到磁盘之前要先写到一个所谓的环形缓冲区里面,然后分区,排序,再溢写到磁盘,设计这个缓冲区的目的是什么?为什么要这么设计?
2.mapreduce 在哪几个阶段会进行排序?为什么要进行排序,是为了解决什么问题?

参考答案:
1.我们都知道 map 写数据到磁盘之前要先写到一个所谓的环形缓冲区里面,然后分区,排序,再溢写到磁盘,
设计这个缓冲区的目的是什么?为什么要这么设计?
a.缓冲区是块内存可以让我们高效读写数据;
b.缓冲区是环形设计,可以同时进行写入和写出(溢写)操作,如果直接写磁盘存在阻塞问题

2.mapreduce 在哪几个阶段会进行排序?为什么要进行排序,是为了解决什么问题?
a.环形缓冲区溢写文件时进行区内排序,使用quicksort
b.多个溢写的文件合并会再次进行排序使用归并排序
c.reducetask拉取多个maptask输出结果文件会再次进行归并排序
为什么进行排序?
因为在reduce端需要对数据进行分组,也就是把相同的key放到一起,排序只是为了降低分组时寻找相同key的压力,如果不排序则需要把所有数据都加载到内存,排序之后相同key的数据集中在一起。可以只部分加载内存。