05 分布式计算框架MapReduce
MapReduce是一种并行编程模型,用于大规模数据集(大于1TB)的并行运算。
Map(映射)和Reduce(归约)是他的主要思想,是从函数式编程语言里借来的,MapReduce还有从矢量编程语言里借来的特性。
极大的方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
当前的软件实现是指定一个Map(映射)函数,实现任务的分配,指定并发的Reduce(归约)函数,用来任务的汇总。
一、定义
MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:
MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 。
二、MapReduce做什么
MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”,“先分后和”。
Mapper负责“_分_”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
- 一是数据或计算的规模相对原任务要大大缩小;
- 二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;
- 三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。
一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
三、MapReduce的主要功能
数据划分和计算任务调度
系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并 负责Map节点执行的同步控制。
数据/代码互定位
为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。
系统优化
为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个 Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。
出错检测和恢复
以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。
四、MapReduce如何工作
统计词频
如果想统计下过去10年计算机论文出现最多的几个单词,看看大家都在研究些什么,那收集好论文后,该怎么办呢?
方法一:
我可以写一个小程序,把所有论文按顺序遍历一遍,统计每一个遇到的单词的出现次数,最后就可以知道哪几个单词最热门了。
这种方法在数据集比较耗时,是非常有效的,而且实现最简单,用来解决这个问题很合适。
方法二:
写一个多线程程序,并发遍历论文。
这个问题理论上是可以高度并发的,因为统计一个文件时不会影响统计另一个文件。当我们的机器是多核或者多处理器,方法二肯定比方法一高效。但是写一个多线程程序要比方法一困难多了,我们必须自己同步共享数据,比如要防止两个线程重复统计文件。
方法三:
把作业交给多个计算机去完成。
我们可以使用方法一的程序,部署到N台机器上去,然后把论文集分成N份,一台机器跑一个作业。这个方法跑得足够快,但是部署起来很麻烦,我们要人工把程序copy到别的机器,要人工把论文集分开,最痛苦的是还要把N个运行结果进行整合(当然我们也可以再写一个程序)。
方法四:
让MapReduce来帮帮我们吧!
MapReduce本质上就是方法三,但是如何拆分文件集,如何copy程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给MapReduce。
MapReduce的一个经典实例是Hadoop。用于处理大型分布式数据库。
让我们用一个例子来理解这一点
假设有以下的输入数据到 MapReduce 程序,统计以下数据中的单词数量:
Welcome to Hadoop ClassHadoop is goodHadoop is bad

MapReduce 任务的最终输出是:
| bad | 1 |
| Class | 1 |
| good | 1 |
| Hadoop | 3 |
| is | 2 |
| to | 1 |
| Welcome | 1 |
这些数据经过以下几个阶段
**输入拆分:**输入到MapReduce工作被划分成固定大小的块叫做 input splits ,输入折分是由单个映射消费输入块。**映射 - Mapping**这是在 map-reduce 程序执行的第一个阶段。在这个阶段中的每个分割的数据被传递给映射函数来产生输出值。在我们的例子中,映射阶段的任务是计算输入分割出现每个单词的数量(更多详细信息有关输入分割在下面给出)并编制以某一形式列表<单词,出现频率>**重排**这个阶段消耗映射阶段的输出。它的任务是合并映射阶段输出的相关记录。在我们的例子,同样的词汇以及它们各自出现频率。**Reducing**在这一阶段,从重排阶段输出值汇总。这个阶段结合来自重排阶段值,并返回一个输出值。总之,这一阶段汇总了完整的数据集。在我们的例子中,这个阶段汇总来自重排阶段的值,计算每个单词出现次数的总和。
五、MapReduce如何组织工作?
Hadoop 划分工作为任务。有两种类型的任务:
- Map 任务 (分割及映射)
Reduce 任务 (重排,还原)
如上所述
完整的执行流程(执行 Map 和 Reduce 任务)是由两种类型的实体的控制,称为
Jobtracker : 就像一个主(负责提交的作业完全执行)
多任务跟踪器 : 充当角色就像从机,它们每个执行工作
对于每一项工作提交执行在系统中,有一个 JobTracker 驻留在 Namenode 和 Datanode 驻留多个 TaskTracker。
六、MapReduce编程基础
- 内置数据类型
Hadoop提供了如下数据类型,都实现了WritableComparable接口,以便于这些类型定义的数据可以被序列化进行网络传输和文件存储以及进行大小比较。
- BooleanWritable:标准布尔型数值
- ByteWritable:单字节数值
- DoubleWritable:双字节数
- FloatWritable:浮点数
- IntWritable:整型数
- LongWritable:长整型数
- Text:使用UTF8格式存储的文本
- NullWritable:当
中的key或value为空时使用 - ArrayWritable:存储属于Writable类型的值的数组
- 案例 ```java package com.etc;
import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text;
/**
@author root / public class HadoopDataType {
public static void main(String[] args) {
testText();testArrayWritable();testMapWritable();
}
/**
使用Hadoop的Text类型数据 */ public static void testText() { System.out.println(“============Test Text============”); Text text = new Text(“Hello Hadoop!!!”); //获取文字的长度 System.out.println(“输入文字的长度是:”+text.getLength()); //获取制定文字的下标位置,从0开始 System.out.println(“字母a出现的下标是:”+text.find(“a”)); //变成字符串输出 System.out.println(“输入的字符串是:”+text.toString()); }
/**
使用ArrayWritable / public static void testArrayWritable() { System.out.println(“============Test ArrayWritable============”); //创建ArrayWritable对象,存入IntWritable类型的值 ArrayWritable arr = new ArrayWritable(IntWritable.class); IntWritable year = new IntWritable(2021); IntWritable month = new IntWritable(01); IntWritable day = new IntWritable(24); //把年月日三个数据加入到数组中 arr.set(new IntWritable[] {year,month,day}); //格式化输出,%d为占位符,通过数组的下标获得 /
- System.out.println(String.format(“year=%d,month=%d,day=%d”,
- ((IntWritable)(arr.get()[0])).get(), ((IntWritable)(arr.get()[1])).get(),
- ((IntWritable)(arr.get()[2])).get() )); */ System.out.print(“year=”+((IntWritable)(arr.get()[0])).get()); System.out.print(“,month=”+((IntWritable)(arr.get()[1])).get()); System.out.println(“,day=”+((IntWritable)(arr.get()[2])).get()); }
/**
- 使用MapWritable */ public static void testMapWritable() { System.out.println(“============Test MapWritable============”); MapWritable map = new MapWritable(); Text k1 = new Text(“name”); Text v1 = new Text(“tonny”); Text k2 = new Text(“password”); //存入MapWritable,key/value为name/tonny map.put(k1,v1); //存入MapWritable,key/value为password/null map.put(k2, NullWritable.get()); //通过key获取value值 System.out.println(“name:”+map.get(k1).toString()); System.out.println(“password:”+map.get(k2).toString()); } } ```

