1、MapReduce的设计构思
MapReduce思想:

简要的说:ResourceManager就是进行资源分配的
NodeManage就是进行具体的计算任务
2、MapReduce的编程规范
Map阶段的两个步骤:
第一步中,如果选择的InputFormat类是TextInputFormat,则
k1:相对文本的行偏移量 v1:偏移量对应的数据
eg:有一个文本内容如下:
hello,world
hello,hadoop
则
0 hello,world
11 hello,hadoop
Reduce阶段的两个步骤:
第一步中,如果选择的OutputFormat类是TextOutputFormat,则输出的就是普通文本
其实执行过程可以简易的用以下图来表示:

3、MapReduce的执行流程

本图只是一个示范,在后期实际操作中可能会有不一样,比如最后写入普通文本文件,以后真正操作可能写入压缩文件等。
本图中的分区是按长度小于5和大于等于5的区
排序是对每个区中单词的字母个数,从小到大排序
分组就是相同单词的为一个组
只是一个示例
4、MapReduce的案例-WordCount
WordCount案例分析图:
4.1、问题补充
问题一:
集群运行模式下,要先将程序打包,然后再放在集群上运行,但是有些hadoop版本直接将程序打包,有时候会出问题,这是需要对job进行额外的配置
如果打包运行出错,则需要添加该配置:
job.setJarByClass(JobMain.class);
问题二:
关于输出目录,输出目录不能存在,如果输出目录已经存在,则程序运行会报错,如果每次都去检查输出目录是否存在,会很麻烦,所以再输出之前加一段程序,进行目录是否存在的判断,如果存在,就将目录删除,添加如下判断目录是否存在的程序:
5、MapReduce分区

就是将相同类型的数据,有共性的数据放到一起处理
注意:使用分区之后要在JobMain中指定分区的处理方式:
eg:job.setPartitionerClass(FlowPartition.class);
还有设置分区个数:
eg:job.setNumReduceTasks(4);
6、MapReduce中的计数器
计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障,如果需要将日志信息传输到map或reduce任务,更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易的多。
hadoop内置计数器列表:
也可以自定义计数器:有两种方式:
方式一:
Counter counter = context.getCounter(“MR_COUNT”,”MyRecordCounter”);
其中MR_COUNT参数一:是该计数器的类型
MyRecordCounter参数二:是该计数器的变量
自己起名
counter.increment(1L);
就是每访问一次就加1,increment就相当于加法
其实自己定义的这个计数器就是可以用来统计文本中共有多少行数据,因为每调用一次方法就加1,而这又是map方法,map方法就是将
方式二:通过enum枚举类型来定义计数器,统计reduce端数据的输入的key有多少个
枚举本质上也是一样的
7、MapReduce的排序和序列化
序列化:
比如:当我们需要将一个对象保存到磁盘中或者进行网络传输,这时就必须要进行序列化的操作;
当我们需要从磁盘中读取一个对象,或者从网络中获取一个对象,此时就需要进行反序列化操作。
序列化的步骤在文档中有提到
排序:通过自定义 Key 实现 WritableComparable 来实现我们的排序功能
若要通过某个字段进行排序,则该字段必须在key里,而不是value中
排序规则写在public int compareTo(FlowBean o)方法中,只用些排序规则(什么时候返回正 数,什么时候返回复数(该方法的返回值为int型))即可,如何排序,该方法内部会自动完成,并且不用在JobMain中设置他
注意:在mapreduce中只要是自定义了一个javabean就必须要实现序列化和反序列化,因为可能使用网络传输,实现序列化和反序列化就必须实现一个接口,若还需要排序就实现WritableComparable接口,若不需要排序,只需要序列化和反序列化,则实现writable接口即可。
8、MapReduce规约
规约其实就是将mapTask得到的结果进行合并,为了减少网络传输时的数据,可以提高网络
传输效率,如下图,如果不进行规约,则要传输9条数据,规约之后,只用传输三条数据
规约其实做的和reduce的工作一样,都是合并,只不过规约是在传输前对每一个MapTask的结果进行合并,Reduce是将所有的MapTask进行合并。
概念
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次
合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce
的一种优化手段之一
特点:
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
combiner 组件的父类就是 Reducer
combiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤
1. 自定义一个 combiner 继承 Reducer,重写 reduce 方法
实现方法和reducer的实现方法一样:
public class MyCombiner extends Reducer
@Override
protected void reduce(Text key, Iterable
//分析发现新K2和V2与K3和V3之间,K2和K3的值一样,只需将V2的值加起来=V3
long count = 0;
//对V2进行遍历
for (LongWritable value : values) {
//让遍历的值进行相加
count += value.get();
}
context.write(key,new LongWritable(count));
}
}
- 在 job 中设置 job.setCombinerClass(CustomCombiner.class)
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟
reducer 的输入 kv 类型要对应起来
shuffle的执行过程:
因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会⽴马写⼊磁盘,⽽是优先存储到map节点的“环形内存缓冲区”,在写
⼊的过程中进⾏分区(partition),也就是对于每个键值对来说,都增加了⼀个partition属性值,然后连同键值对⼀起序列化成字节数组写
⼊到缓冲区(缓冲区采⽤的就是字节数组,默认⼤⼩为100M)。当写⼊的数据量达到预先设置的阙值后
(mapreduce.map.io.sort.spill.percent,默认0.80,或者80%)便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临
时⽂件中,并在写⼊前根据key进⾏排序(sort)和合并(combine,可选操作)。溢出写过程按轮询⽅式将缓冲区中的内容写到
mapreduce.cluster.local.dir属性指定的⽬录中。当整个map任务完成溢出写后,会对磁盘中这个map任务产⽣的所有临时⽂件(spill⽂
件)进⾏归并(merge)操作⽣成最终的正式输出⽂件,此时的归并是将所有spill⽂件中的相同partition合并到⼀起,并对各个partition
中的数据再进⾏⼀次排序(sort),⽣成key和对应的value-list,⽂件归并时,如果溢写⽂件数量超过参数min.num.spills.for.combine的
值(默认为3)时,可以再次进⾏合并。⾄此,map端shuffle过程结束,接下来等待reduce task来拉取数据。对于reduce端的shuffle过
程来说,reduce task在执⾏之前的⼯作就是不断地拉取当前job⾥每个map task的最终结果,然后对从不同地⽅拉取过来的数据不断地做
merge最后合并成⼀个分区相同的⼤⽂件,然后对这个⽂件中的键值对按照key进⾏sort排序,排好序之后紧接着进⾏分组,分组完成后才
将整个⽂件交给reduce task处理。
