reducetask的并行度
一个yarnchild 对应一个maptask
1个reucetask任务对应的就是MyReducer的计算量, 运行reduce任务的并行度
默认情况下 reducetask的个数只有1个, 一个reducetask只能运行在一个节点上。
通过wordcount的结果我们可以看出也是只有一个reducetask。
当数据量很大时候 只有一个reducetask不合理的
- 这个reducetask的压力很大
- 负载不均衡
设置reducetask个数
reducetask也应该根据我们的实际的数据 设置多个
如何设置:
代码设置
//设置reducetask的个数job.setNumReduceTasks(tasks);
参数代表就是reducetask的个数 需要几个reducetask的时候 设置为几就可以。
这个参数值的默认为,默认情况只运行一个reducetask。
job.setNumReduceTask(3);发现输出的结果3个文件 1个标志文件part-r-00000
part-r-00001
part-r-00002
结论:一个reducetask最终输出一个对应的结果文件
reduce中的数据划分
默认情况下:hash分割数据
一个reducetask的数据对应的是一个分区的数据
分区:对map输出的数据进行一个按照一定的规则划分,每一部分称为一个分区
key.hash%reducetask的个数
- partition,这里的分割规则叫分区算法
推断:默认的分区算法:hash算法 1)散列 2)唯一
分区的底层实现
默认的抽象类 Partitioner 抽象类 定义分区算法/分区规则
public abstract class Partitioner<KEY, VALUE> {//返回值:int 这里的返回值代表的是分区编号,每一个分区的唯一标志,默认从0开始,顺序递增的//参数:参数1-map输出的key 参数2-map输出的value 参数3-分区个数(job.setNumReduceTask())public abstract int getPartition(KEY key, VALUE value, int numPartitions);}
默认调用的实现类:HashPartitioner
默认的分区算法 取map的key的hash值%reducetask的个数 获取返回值就是分区编号
//泛型指的是map输出的k v的类型public class HashPartitioner<K, V> extends Partitioner<K, V> {//默认的参数3 numReduceTasks/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
key.hashCode() & Integer.MAX_VALUE目的:防止溢出 范围控制integer_max ```shell 1011001100111010101010101
&
111111111111111111111111
011001100111010101010101
<a name="5db9fd7c"></a>### 小结> 默认情况下- 一个分区----一个reducetask----一个输出结果文件- part-r-00001 00001---代表分区编号- 分区算法采用hash分区<br />`(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;`- 分区算法 和 reducetask的个数一一对应的- reducetask的并行度,取决于r**educetask的个数** ,job.setNumReduceTasks- reducetask的数据如何分配,取决于分区算法的,默认的hash分区> 自定义分区:自己定义的- 分区算法决定map输出的数据如何分配给reduce<a name="7bbb4913"></a>## 自定义分区> 默认的分区算法 并不能满足所有的需求,不能自定义数据的去向。```shell假设:有一堆数据,地区有江苏,浙江,上海,内蒙等地区如果我们想按照地域进行划分reducetask的数据,默认的就不可以,我们需要自定义分区
自定义分区说明
写法:1)继承Partitioner类2)重写getPartition 方法,这个方法的返回值,代表就是分区编号
例子
分析
例子:将流量汇总统计结果按照手机归属地不同省份输出到不同文件中1. 手机号归属地--假设手机号的前三位2. 输出到不同的文件-----使用不同的reducetask输出到不同分区3. 分区算法:按照手机号的归属地进行划分的自定义分区:1. 分区字段:map的key的位置的数据2. 按照手机号分区shuffle过程map端:key:手机号value:剩下的reduce端:输出
//指定自定义分区job.setPartitionerClass(MyPartitioner.class);//指定reducetask的个数job.setNumReduceTasks(3);
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/*** 泛型 map端输出的<key,value>* @author Administrator*/public class MyPartitioner extends Partitioner<Text, Text>{/*** 参数1---map大输出key 参数2:map大输出的value 参数3:分区个数*/@Overridepublic int getPartition(Text key, Text value, int numPartitions) {String mk = key.toString();if(mk.startsWith("134")||mk.startsWith("135")||mk.startsWith("136")){return 0;//part-r-00000}else if(mk.startsWith("137")||mk.startsWith("138")||mk.startsWith("139")){return 1;//part-r-00001}else{return 2;//part-r-00002}}}
报错
Illegal partition for 15013685858 (3)
分区个数说明:
- reducetask的个数为1的时候,所有的分区的数据默认全部到一个reducetask中
- reducetask的个数大于1, 按照分区编号对应的分区到对应的reducetask中
- 设置的时候
reducetask的个数=分区个数
分区编号值设置为3----说明reducetask-3个1)reducetask0=分区02)reducetask1=分区13)reducetask2=分区2报错:Illegal partition for 15013685858 (3)原因:分区和reducetask的个数不匹配注意:设定分区编号的时候,最好顺序递增的,不要跳数(从0开始)分区个数3个(getPartition方法返回的个数):reducetask的个数(job.setNumReduceTasks可以设置:1) 1 === 可以:对应一个输出结果2) 2 === 不可以:3) 3 可以4) >3 可以
数据倾斜:分区不合理
多个reducetask并行计算的时候,某一个reducetask分配数据不均匀,这个时候产生现象数据倾斜。
后果:reducetask的整体运行的性能低
- 数据倾斜一定需要尽量避免的
- 原因:分区算法没有合理的分配均匀数据
假设有100个reducetask其中99reducetask--1T 需要10min剩下1个reducetask---100T1000min那么进度:map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%map 100% reduce 99%...............大数据中不怕数据量大 怕数据倾斜
解决
合理设计分区算法,根据实际的数据抽样做测试,进行合理设计分区,业务,数据。
比如购物的的时候,西藏等地方的数据量就会少于北上广等地的数据。
reducetask数据倾斜的原因
mapreduce阶段:
maptask:1个切片默认128M—-1maptask
- maptask的并行度比较高,切片个数—-数据块的个数(最后一个切片有可能跨块的)
- maptask数据一个切片为128M,对应的数据量比较小。
reducetask:容易产生数据倾斜
- 并行度不高,job.setNumReduceTasks()
经验值:**reducetask最大值=datanode0.95 即100个节点设置reducetask的个数为95。
- 并行度不高,job.setNumReduceTasks()
2.
reducetask的数据分配取决于分区算法
3.
reducetask的对应的数据量本身比较大
combiner组件
这个组件不适用所有的场景的
优化组件:提升性能的作用 ,这个组件默认不加的
作用:对到reducetask之前的数据做预处理:
- 帮助reducetask做预处理,减少reducetask的数据量,提升性能
组件怎么加?
- 这个组件就是提前帮助reducetask做一些自己的工作,减轻redcetask的压力.
- combiner和reducetask的业务逻辑一样
- 作用的时间点在maptask之后,reducetask之前
实现:这个组件不会影响业务逻辑和reducetask的实现是一样
- 继承Reducer类
- 重写reduce方法
- job中设置添加combiner组件
//实现和reducetask的实现一样//实际的开发过程直接使用reducer的类作为combiner的类job.setCombinerClass(MyCombiner.class);
场景
可以适用
- 求和
- 求最大值
- 求最小值
不适用:
- 求平均值(这是是相当于平均值再取平均值)
例子
流量汇总统计结果按照手机归属地不同省份输出到不同文件中
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowPartition {//key--手机号 value---其他static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{Text mk=new Text();Text mv=new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String[] datas = value.toString().split("\t");mk.set(datas[0]);mv.set(datas[1]+"\t"+datas[2]+"\t"+datas[3]);context.write(mk, mv);}}static class MyReducer extends Reducer<Text, Text, Text, Text>{//分组 按照手机号分组的@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {for(Text v:values){context.write(key, v);}}}public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME", "hadoop");//将mapper reducer类进行一个封装 封装为一个任务----job(作业)//加载配置文件Configuration conf=new Configuration();//启动一个Job 创建一个job对象try {Job job=Job.getInstance(conf);//设置这个job//设置整个job的主函数入口job.setJarByClass(FlowPartition.class);//设置job的mappper的类job.setMapperClass(MyMapper.class);//设置job的reducer的类job.setReducerClass(MyReducer.class);//如果map输出的key value的类型 和reduce输出的key value的类型相同 这个时候只设置最终输出//设置reduce的输出的k v类型 以下方法设置的是mr的最终输出//这个设置实际上 map和reduce的输出//如果输出的类型不一致 一定分开设置job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//指定自定义分区job.setPartitionerClass(MyPartitioner.class);//指定reducetask的个数job.setNumReduceTasks(2);//指定需要统计的文件的输入路径 FileInputFormat 文件输入类Path inpath=new Path("hdfs://hadoop01:9000/flow_out01");FileInputFormat.addInputPath(job, inpath);//指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失Path outpath=new Path("hdfs://hadoop01:9000/flow_partition_out05");FileOutputFormat.setOutputPath(job, outpath);//提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job//job.submit();job.waitForCompletion(true);} catch (Exception e) {e.printStackTrace();}}}
