第七章 MapReduce

一、概述

MapReduce是一种并行编程模型,用于大规模数据集(大于1TB)的并行计算,它将复杂的、运行于大规模集群上的并行计算过程高度抽象到两个函数:MapReduce,这两个函数及其核心思想都来自函数式编程语言
Hadoop MapReduce是谷歌MapReduce的开源实现,运行在分布式文件系统HDFS上。

与传统并行计算框架的对比

传统的并行计算框架(如MPI)采用共享式架构(共享内存&存储、采用存储区域网络SAN)、容错性较差;使用的刀片服务器价格高、集群扩展性差(只能从提高机器性能上进行纵向扩展)。它适用于要求实时性、细粒度计算和计算密集型的场景。
MapReduce采用非共享式架构,容错性好;并且它所用的服务器均为普通PC机(价格便宜)、且横向扩展性好(通过增加服务器即可实现集群扩展)。它适用于批处理、非实时、数据密集型的场景。

模型简介

MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。
策略——分而治之:将一个大规模数据集切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理。
理念——计算向数据靠拢:可以避免移动数据所需要大量的网络传输开销。
框架——Master/Slave架构:包括一个Master和若干个Slave,Master上运行JobTracker,Slave上运行TaskTracker

工作流程

在MapReduce中,一个存储在分布式文件系统的大规模数据集会被切分为多个独立的小数据集,这些小数据集会作为多个Map任务的输入,各个Map任务的输出又作为Reduce任务的输入,最后由Reduce输出最终结果并写入分布式系统。

前提条件:

  1. 待处理的数据集可以被分成许多小的数据集
  2. 每一个小数据集都可以完全并行处理

MapReduce的核心思想是计算向数据靠拢,因为大规模数据集的前移代价较大,所以尽量使Map程序就近地在HDFS数据所在的节点运行,即将计算节点和存储节点放在一起运行。
MapReduce框架是用Java开发的,但MapReduce应用程序不一定用Java实现。

二、MapReduce体系结构

image.png
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
Client:客户端,用于提交作业
JobTracker:作业跟踪器,负责作业调度,作业执行,作业失败后恢复
TaskScheduler:任务调度器,负责任务调度
TaskTracker:任务跟踪器,负责任务管理(启动任务,杀死任务等)

  • Client-提交作业,查看作业状态
    提交作业:用户编写的MapReduce程序通过Client提交到JobTracker端
    查看作业状态:用户可通过Client提供的一些接口查看作业运行状态
  • JobTracker-资源监控、作业调度
    资源监控:JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现节点失效(通信失败或节点故障),就将相应的任务转移到其他节点
    作业调度:JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而任务调度器会选择合适的(比较空闲)节点资源来执行任务
  • TaskScheduler-任务调度器
  • TaskTracker-任务管理
    TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
    TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器(TaskScheduler)的作用就是将各个TaskTracker上的空闲slot分配给Task使用。
  • slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用
  • Task
    Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动

三、MapReduce工作流程

MapReduce的输入和输出都需要借助分布式文件系统进行存储,这些文件被分布存储到集群中的多个节点上。
Map函数和Reduce函数都是以类型的数据作为输入,并按照一定规则转换成另一个或一批输出。

image.png image.png

MapReduce的核心思想可以用“分而治之”描述。即将一个大的数据集拆分成多个小数据块在多台机器上并行执行,每个Map任务通常运行在数据存储的节点上。当Map任务结束后,会生成类型的中间结果,这些中间结果会被分发到多个Reduce任务,在多台机器上并行执行,具有相同key的会被发送到同一个Reduce任务,Reduce任务会对中间结果进行汇总得到最后结果。
注意:

  1. Map任务彼此之间不会进行通信,Reduce任务也是。所有数据交换都是由MapReduce框架去实现的。
  2. Map的输入文件和Reduce的输出文件都是保存在HDFS的,Map的输出的中间结果保存在本地文件系统(磁盘)。
  3. 只有Map需要考虑“计算向数据靠拢”,Reduce不需要考虑。

MapReduce各个执行阶段

image.png

  1. MapReduce框架使用InputFormat模块做Map前的预处理,比如验证输入的格式是否合法。然后将输入文件切分为逻辑上的多个InputSplit——分片,只是一个逻辑概念,只记录了要处理的数据的位置和长度。
  2. 因为InputSplit只是逻辑切分,还需要通过RecordReader(RR)根据InputSplit中的信息来处理InputSplit中的具体记录,即加载数据并转换为合适的键值对,输入给Map任务。
  3. Map任务会根据用户自定义的映射规则,输出一些列作为中间结果
  4. 为了Reduce可以并行处理Map的结果,需要对Map的输出进行一系列分区(Portition)、合并(combine)、排序(Sort)、归并(Merge)操作,得到形式的中间结果,再交给对应的Reduce模块进程处理。上述过程称为Shuffle(洗牌),即将无序的到有序的
  5. Reduce模块以一系列中间结果作为输入,执行用户定义的逻辑,输出结果给OutPutFormat模块。

注意(Map和Reduce任务个数的选择):

  • 一般以一个文件的物理block大小作为逻辑切分大小,Map任务个数与分片大小相同
  • 最优的Reduce任务个数取决于集群中可用的Reduce任务槽(slot)个数,通常设置小于slot个数,预留一些容错个数。

    Shuffle

    image.png

    Map端的Shuffle

    Map的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,将缓存中的数据写入磁盘文件,并清空缓存。写入磁盘文件前,还需要进行一系列操作,首先对缓存中的数据进行分区,然后对每个分区中的数据进行排序合并,之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,当Map任务全部结束之前,这些溢写文件会被归并为一个大文件,这个大文件也是分区的,然后通知相约Reduce任务前来取走对应的数据。
  1. 写入缓存:目的是减少频繁写入磁盘的寻址开销
  2. 溢写:每个Map任务的缓存默认是100MB,为了不影响缓存的写入,当达到一定的溢写比例时(默认为80%),即启动溢写线程,溢写过程不影响缓存的写入
    1. 分区:在溢写到磁盘之前,缓存中的数据会被分区,默认的分区方式时采用Hash函数对Key进行哈希后再用Reduce任务的数量进行取模。
    2. 排序:根据key对其进行默认的内存排序
    3. 合并:是指将具有相同key的键值对的value相加,从而减少需要溢写到磁盘的数据量。但并非所有场合都可以合并,合并必须不能改变Reduce的计算结果,合并操作时可选的

      Reduce端的Shuffle

      Reduce任务从Map端的不同Map机器领会属于自己的处理的部分数据,然后对数据进行归并后交给Reduce处理

第八章 Hadoop再探讨

Hadoop自身的局限与不足

  • 抽象层次低
  • 表达能力有限
  • 开发者自己管理作业之间的依赖关系
  • 难以看到程序整体逻辑
  • 执行迭代操作效率低
  • 资源浪费
  • 实时性差 | 组件 | Hadoop1.0 | Hadoop2.0 | | —- | —- | —- | | HDFS | 单一名词节点,存在单点失效 | 设计了HDFS HA(High Availability),提供名词节点热备份机制 | | | 单一命名空间,无法实现资源隔离 | 设计了HDFS联邦,管理多个命名空间 | | MapReduce | 资源管理效率低 | 设计了新的资源管理框架YARN,MapReduce专注于计算 |

HDFS2.0新特性

块大小64 -> 128MB

HA(热备份)

解决单点故障问题
HDFS1.0中只存在一个名称节点,一旦发生故障,会导致整个集群不可以,虽然存在第二名称节点,但其主要目的是防止EditLog过大,无法提供“热备份”。
HDFS2.0采用HA(High Availability)架构,设置两个名称节点,由ZooKeeper确保同一时间只有一个节点处于活跃(Active)状态,另一个处于待命(Standby)状态。
名称节点的两个核心数据结构,FsImageEditLog。两个名称节点都需要保持同步。

  • 对于EditLog,HDFS2.0采用共享存储系统(NFS、ZooKeeper),活跃名称节点将更新数据写入其中,待命节点会一直对其监听,当发现有更新时,就从共享存储系统读取这些数据并加载到自己内存。
  • 对于FsImage,要求每一个数据节点必须配置两个名称节点的地址,当数据节点新加入集群中时,必须将自己所包含的数据块列表报告给名称节点,后续也定期发送心跳信号给名称节点。
  • image.png

HDFS联邦(Federation)

HDFS1.0仍有以下缺陷:

  1. 不可水平扩展:同一时间仍然只有一个名称节点发挥作用、
  2. 系统整体性能受限于单个名称节点的吞吐量
  3. 单个名称节点难以提供不同程序之间的隔离性:一个应用程序消耗内存较大,占用了其他程序的资源
  4. image.png

image.png
在HDFS联邦中,所有名称节点相互独立,分别进行各自命名空间和块的管理,不需要彼此协调,且具有良好的向后兼容性。
所有名称节点共享底层的数据节点存储资源。每个数据节点向所有名称节点注册,并周期性发送心跳信号和块信息,并接受来自名称节点的指令。
访问方式:
采用客户端挂载表进行数据共享和访问
注意:联邦并不能解决单点故障问题,仍然需要给每一个名称节点配置HA机制

新一代资源管理调度框架YARN

MapReduce1.0缺陷:

  • 存在单点故障
  • JobTracker任务过重
  • 容易出现内存溢出:TaskTracker端,资源的分配并不考虑CPU、内存等实际情况,只是根据MapReduce任务个数来分配资源
  • 资源划分不合理:TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等),Reduce槽空余时,Map任务并不能使用

    设计思路

    MapReduce2.0变为纯粹的计算框架,其原有的资源管理调度功能被拆分出形成了YARN。
    image.png

    体系结构

    三大核心组件:ResourceManagerApplicationMasterNodeManager

    ResourceManager

    功能:
  1. 处理客户端请求
  2. 为每个用户作业启动一个ApplicationMaster,并监控ApplicationMaster
  3. 监控NodeManager
  4. 资源分配与调度

组件:

  • 调度器Scheduler:资源的管理和分配,接受来自ApplicationMaster的应用程序资源请求,将资源以容器(Container)的形式分配给ApplicationMaster。
  • 应用程序管理器ApplicationManager:负责系统中所有应用程序的管理工作,启动ApplicationMaster。(主要管理ApplicationMaster,再有ApplicationMaster管理其他应用程序)

    ApplicationMaster

  • 为应用程序向ResourceManager协商获取资源

  • 将获取的资源进一步分配给内部的各个任务(Map或Reduce)
  • 任务调度、监控与容错
  • 与NodeManager保持交互
  • 定时向ResourceManager发送心跳信息,汇报情况
  • 作业完成时,向ResourceManager注销容器

    NodeManager

  • 单个节点上的资源管理

  • 处理来自ResourceManager的命令
  • 处理来自ApplicationMaster的命令

image.png

工作流程

image.png
1、Client 向 YARN 提交应用程序,其中包括 ApplicationMaster 程序及启动 ApplicationMaster 命令
2、ResourceManager 为该 ApplicationMaster 分配第一个 Container,并与对应的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster
3、ApplicationMaster 向ResourceManager 注册
4、ApplicationMaster 为 Application 的任务申请并领取资源
5、获取到资源后,要求对应的 NodeManager 在 Container 中启动任务
6、NodeManager 收到 ApplicationMaster 的请求后,为任务设置好运行环境(包括环境变量、JAR 包等),将任务启动脚本写到一个脚本中,并通过运行该脚本启动任务
7、各个任务通过 RPC 协议向 ApplicationMaster 汇报自己的状态和进度,以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在失败时重启任务
8、应用程序完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己
实际中,集群可能并没有那么多资源来满足 ApplicationMaster 的资源请求,这是 ApplicationMaster 会采用轮循的方式不断申请资源,直到申请到资源或 Application 结束

与MapReduce1.0对比

image.png

image.pngimage.png