1. MapTask 运行机制详解
- MapTask 流程
详细步骤
- 首先,读取数据组件 InputFormat(默认 TextInputFormat)会通过 getSplits 方法对输入目录中文件进行 逻辑切片(区别与 HDFS 的物理分块),规划得到 splits,有多少个split 就对应启动多少MapTask。
- split 与 block 的对应关系默认是一对一(split 和 block 的默认大小都是 128M)
- 首先,读取数据组件 InputFormat(默认 TextInputFormat)会通过 getSplits 方法对输入目录中文件进行 逻辑切片(区别与 HDFS 的物理分块),规划得到 splits,有多少个split 就对应启动多少MapTask。
- 将输入文件切分为 splits 之后,由 RecordReader 对象(默认 LineRecordReader)进行读取,以 \n 作为分隔符,读取一行行数据,返回
。 - Key 表示每行首字符偏移值,value 表示这一行文本内容
- 将输入文件切分为 splits 之后,由 RecordReader 对象(默认 LineRecordReader)进行读取,以 \n 作为分隔符,读取一行行数据,返回
- 读取 split 返回
,进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数。RecordReader 读取一行这里 map 调用一次。
- 读取 split 返回
- map 逻辑完成之后,将 map 的每条结果通过 context.write 进行 collect 数据收集。在 collect 中,会先对其进行 分区处理,默认使用 HashPartitioner
- MapReduce 提供 Partitioner 接口, 它的作用就是根据 key 或 value 及 reduce 的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对 key hash 后再以reduce task数量量取模(分区策略是k的hashCode值%reduceTask的数量)。默认的取模方式只是为了平均 reduce 的处理能力,如果用户自己对 Partitioner 有需求,可以订制并设置到 job 上。
- map 逻辑完成之后,将 map 的每条结果通过 context.write 进行 collect 数据收集。在 collect 中,会先对其进行 分区处理,默认使用 HashPartitioner
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘 IO 的影响。我们的 key/value对 以及 Partition 的结果都会被写入缓冲区。当然在写入之前,key 与 value 值都会被序列化成字节数组
- 进入缓冲区时 要对 key 进行排序(快速排序,QuickSort)
- 环形缓冲区其实是一个数组(环形数组),数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
- 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中⽂文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例 spillpercent。这个比例默认是80%,也就是当缓冲区的数据已经达到阈值(buffer size spillpercent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响(溢写:目的是让缓冲区中的读和写互不干扰)
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘 IO 的影响。我们的 key/value对 以及 Partition 的结果都会被写入缓冲区。当然在写入之前,key 与 value 值都会被序列化成字节数组
- 当溢写线程启动后,需要对这80MB空间内的key做 **排序**(Sort)。这里的排序用的是**归并排序(MergeSort),排序是MapReduce模型默认的行为**
- 如果 job 设置过 Combiner(预聚合),那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用
- 哪些场景才能使用 Combiner 呢?从这里分析,Combiner 的输出是 Reducer 的输入,Combiner 绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重, 如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果
- 当溢写线程启动后,需要对这80MB空间内的key做 **排序**(Sort)。这里的排序用的是**归并排序(MergeSort),排序是MapReduce模型默认的行为**
- 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有 combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行 merge 合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量
至此,map 整个阶段结束
MapTask 的一些配置
即 maptask 的数量,等于 切片split 的数量,也等于 hdfs 分块的数量
- MapTask并行度思考
- MapTask 的并行度决定 Map 阶段的任务处理并发度,从而影响到整个 Job 的处理速度。
- 思考:MapTask并行任务是否越多越好呢?哪些因素影响了了MapTask并行度?
- MapTask 并行度决定机制
- 数据块:Block 是 HDFS 物理上把数据分成一块一块
- 切片:数据切片split 只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储
- 切片split 的计算方式
- 思想:移动计算比移动数据成本更低
- 移动计算,即 将计算包发送至目的地,比发送数据更划算
切片机制 源码阅读
- 经过源码分析,得知 切片的默认大小是 128M
- 问题:切片(多线程数量)是不是越多越好呢?
- 答:不是,线程开得越多,消耗的资源越多,因此线程的数量要有个度。比如一个129M的文件,会切成一片,而不是两片(如果切成两片,第二片是1M,开的线程消耗资源更多,得不偿失)
- 当一个文件占据了多个block块,并且这个文件的总大小 < blocksize * 1.1 时,会进入一个分片。
- 思考题:对于129M的文件,如果切片为 1,那么分块 block 为多少?
- 2个
3. ReduceTask 工作机制
Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别 将内存中的数据 merge 到磁盘(inMemoryMerger) 和 将磁盘中的数据进行 merge(onDiskMerger)。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用户定义的 reduce 函数进行处理
详细步骤
Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过 HTTP 方式请求 maptask 获取属于自己的文件。
- Merge阶段。这里的 merge 如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge 有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用(内存到内存)。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有 Combiner,也是会启用的,然后在磁盘中生成了了众多的溢写文件。第二种merge方式一直在运行,直到没有 map 端的数据时才结束,然后启动第三种磁盘到磁盘的merge 方式生成最终的文件。
- 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中
4. ReduceTask 并行度
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置
注意事项
map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle
shuffle:洗牌、发牌 —>(核心机制:数据分区,排序,分组,combine,合并 等过程)
MapReduce分区与reduceTask数量
在 MapReduce 中,通过我们指定分区,会将同一个分区的数据发送到同一个 reduce 当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个 reduce 当中去,在同一个 reduce 当中统计相同类型的数据
- 如何才能保证相同 key 的数据去往同一个 reduce 呢?
- 只需要保证相同 key 的数据分发到同个分区即可。结合以上原理分析我们知道 MR 程序 shuffle 机制默认 就是这种规则!!!
分区源码
翻阅源码验证以上规则,MR程序默认使用的 HashPartitioner,保证了相同的 key 去往同个分区
实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以及分区数量
- 具体步骤
- 自定义类继承 Partitioner,重写 getPartition() 方法
- 在 Driver 驱动中,指定使用自定义 Partitioner
- 在 Driver 驱动中,要根据自定义 Partitioner 的逻辑设置相应数量的 ReduceTask 数量
需求:按照不同的 appkey 把记录输出到不同的分区中
- 原始日志格式
- 输出结果
- 根据 appkey 把不同厂商的日志数据分别输出到不同的文件中
需求分析
- 面对业务需求,结合 mr 的特点,来设计 map 输出的 kv,以及 reduce 输出的 kv 数据
- 一个 ReduceTask 对应 一个输出文件,因为在 shuffle 机制中每个 reduceTask 拉取的都是某一个分区的数据,**一个分区对应一个输出文件**
- 结合 appkey 的前缀相同的特点,同时不使用默认分区规则,而是使用自定义分区器,只要 appkey 前缀相同则数据进入同一个分区
整体思路
- Mapper
- 读取一行文本,按照制表符
\t
切分
- 读取一行文本,按照制表符
- 解析出 appkey 字段,其余数据封装为 PartitionBean 对象(实现序列化 Writable 接口)
- 设计 map( ) 输出的 kv,
key-->appkey
(依靠该字段完成分区),PartitionBean 对象作为 Value 输出
- 设计 map( ) 输出的 kv,
- Partition
- 自定义分区器,实现按照 appkey 字段的前缀来区分所属分区
- Reducer
- reduce() 正常输出即可,无需进行聚合操作
- Driver
- 在原先设置 job 属性的同时增加设置:使用自定义分区器
- 2. 注意设置 ReduceTask 的数量(与分区数量保持一致)
Mapper
package com.lagou.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
final PartitionBean bean = new PartitionBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split("\t");
String appKey = strings[2];
bean.setId(strings[0]);
bean.setDeviceId(strings[1]);
bean.setAppKey(strings[2]);
bean.setIp(strings[3]);
bean.setSelfDuration(Long.parseLong(strings[4]));
bean.setThirdPartDuration(Long.parseLong(strings[5]));
bean.setStatus(strings[6]);
context.write(new Text(appKey), bean);
}
}
PartitionBean
package com.lagou.partition;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PartitionBean implements Writable {
private String id;
private String deviceId;
private String appKey;
private String ip;
private Long selfDuration;
private Long thirdPartDuration;
private String status;
public PartitionBean() {
}
public PartitionBean(String id, String deviceId, String appKey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
this.id = id;
this.deviceId = deviceId;
this.appKey = appKey;
this.ip = ip;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.status = status;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(deviceId);
out.writeUTF(appKey);
out.writeUTF(ip);
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeUTF(status);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.deviceId = in.readUTF();
this.appKey = in.readUTF();
this.ip = in.readUTF();
this.selfDuration = in.readLong();
this.thirdPartDuration = in.readLong();
this.status = in.readUTF();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAppKey() {
return appKey;
}
public void setAppKey(String appKey) {
this.appKey = appKey;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(Long selfDuration) {
this.selfDuration = selfDuration;
}
public Long getThirdPartDuration() {
return thirdPartDuration;
}
public void setThirdPartDuration(Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return id +
"\t" + deviceId +
"\t" + appKey +
"\t" + ip +
"\t" + selfDuration +
"\t" + thirdPartDuration +
"\t" + status;
}
}
CustomPartitioner
package com.lagou.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartition extends Partitioner<Text, PartitionBean> {
@Override
public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) {
int partition = 0;
if (text.toString().equals("kar")) {
partition = 0;
} else if (text.toString().equals("pandora")) {
partition = 1;
} else {
partition = 2;
}
return partition;
}
}
PartitionReducer
package com.lagou.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
@Override
protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
for (PartitionBean bean : values) {
context.write(key, bean);
}
}
}
PartitionDriver
package com.lagou.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(PartitionDriver.class);
job.setMapperClass(PartitionMapper.class);
job.setReducerClass(PartitionReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PartitionBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PartitionBean.class);
job.setPartitionerClass(CustomPartition.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path("D:\\partition\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\partition\\output"));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
总结
- 自定义分区器时 最好**保证 分区数量 与 reduceTask 数量保持一致!!!**
- 如果分区数量不止 1 个,但是 reduceTask 数量 1 个,此时只会输出一个文件
- 如果 reduceTask 数量 大于分区数量,但是输出多个空文件
- 如果 reduceTask 数量 小于分区数量,有可能会报错
- combiner 运行机制
- Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
- Combiner 组件的父类就是 Reducer
- Combiner 和 reducer 的区别在于运行的位置(两者运行机制一样,程序可以互换)
- Combiner 是在每一个 maptask 所在的节点运行
- Combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
Combiner 能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
1.改造WordCount程序
package com.lagou.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
num += value.get();
}
context.write(key, new IntWritable(num));
}
}
2.在驱动(Driver)设置使用Combiner
job.setCombinerClass(WordcountCombiner.class);
- 思考题:如果直接使用 WordCountReducer 作为 Combiner 使用是否可以?
- 答:**可以**!因为实现的逻辑是一模一样的