reducetask的并行度

一个yarnchild 对应一个maptask

1个reucetask任务对应的就是MyReducer的计算量, 运行reduce任务的并行度

默认情况下 reducetask的个数只有1个, 一个reducetask只能运行在一个节点上

通过wordcount的结果我们可以看出也是只有一个reducetask。

  • 当数据量很大时候 只有一个reducetask不合理的

    1. 这个reducetask的压力很大
    2. 负载不均衡

设置reducetask个数

reducetask也应该根据我们的实际的数据 设置多个

如何设置:

代码设置

  1. //设置reducetask的个数
  2. job.setNumReduceTasks(tasks);
  • 参数代表就是reducetask的个数 需要几个reducetask的时候 设置为几就可以。

  • 这个参数值的默认为,默认情况只运行一个reducetask。

  • job.setNumReduceTask(3);发现输出的结果3个文件 1个标志文件

    • part-r-00000

    • part-r-00001

    • part-r-00002

    • 结论:一个reducetask最终输出一个对应的结果文件

reduce中的数据划分

默认情况下:hash分割数据

  • 一个reducetask的数据对应的是一个分区的数据

  • 分区:对map输出的数据进行一个按照一定的规则划分,每一部分称为一个分区

  1. key.hash%reducetask的个数
  • partition,这里的分割规则叫分区算法
    推断:默认的分区算法:hash算法 1)散列 2)唯一

分区的底层实现

默认的抽象类 Partitioner 抽象类 定义分区算法/分区规则

  1. public abstract class Partitioner<KEY, VALUE> {
  2. //返回值:int 这里的返回值代表的是分区编号,每一个分区的唯一标志,默认从0开始,顺序递增的
  3. //参数:参数1-map输出的key 参数2-map输出的value 参数3-分区个数(job.setNumReduceTask())
  4. public abstract int getPartition
  5. (KEY key, VALUE value, int numPartitions);
  6. }

默认调用的实现类:HashPartitioner

默认的分区算法 取map的key的hash值%reducetask的个数 获取返回值就是分区编号

  1. //泛型指的是map输出的k v的类型
  2. public class HashPartitioner<K, V> extends Partitioner<K, V> {
  3. //默认的参数3 numReduceTasks
  4. /** Use {@link Object#hashCode()} to partition. */
  5. public int getPartition(K key, V value,
  6. int numReduceTasks) {
  7. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  8. }
  9. }
  • key.hashCode() & Integer.MAX_VALUE 目的:防止溢出 范围控制integer_max ```shell 1011001100111010101010101
    &
    111111111111111111111111

011001100111010101010101

  1. <a name="5db9fd7c"></a>
  2. ### 小结
  3. > 默认情况下
  4. - 一个分区----一个reducetask----一个输出结果文件
  5. - part-r-00001 00001---代表分区编号
  6. - 分区算法采用hash分区<br />
  7. `(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;`
  8. - 分区算法 和 reducetask的个数一一对应的
  9. - reducetask的并行度,取决于r**educetask的个数** ,job.setNumReduceTasks
  10. - reducetask的数据如何分配,取决于分区算法的,默认的hash分区
  11. > 自定义分区:自己定义的
  12. - 分区算法决定map输出的数据如何分配给reduce
  13. <a name="7bbb4913"></a>
  14. ## 自定义分区
  15. > 默认的分区算法 并不能满足所有的需求,不能自定义数据的去向。
  16. ```shell
  17. 假设:有一堆数据,地区有江苏,浙江,上海,内蒙等地区
  18. 如果我们想按照地域进行划分reducetask的数据,
  19. 默认的就不可以,我们需要自定义分区

自定义分区说明

  1. 写法:
  2. 1)继承Partitioner
  3. 2)重写getPartition 方法,这个方法的返回值,代表就是分区编号

例子

分析

  1. 例子:将流量汇总统计结果按照手机归属地不同省份输出到不同文件中
  2. 1. 手机号归属地--假设手机号的前三位
  3. 2. 输出到不同的文件-----使用不同的reducetask输出到不同分区
  4. 3. 分区算法:按照手机号的归属地进行划分的
  5. 自定义分区:
  6. 1. 分区字段:mapkey的位置的数据
  7. 2. 按照手机号分区
  8. shuffle过程
  9. map端:
  10. key:手机号
  11. value:剩下的
  12. reduce端:
  13. 输出
  1. //指定自定义分区
  2. job.setPartitionerClass(MyPartitioner.class);
  3. //指定reducetask的个数
  4. job.setNumReduceTasks(3);
  1. import org.apache.hadoop.io.Text;
  2. import org.apache.hadoop.mapreduce.Partitioner;
  3. /**
  4. * 泛型 map端输出的<key,value>
  5. * @author Administrator
  6. */
  7. public class MyPartitioner extends Partitioner<Text, Text>{
  8. /**
  9. * 参数1---map大输出key 参数2:map大输出的value 参数3:分区个数
  10. */
  11. @Override
  12. public int getPartition(Text key, Text value, int numPartitions) {
  13. String mk = key.toString();
  14. if(mk.startsWith("134")||mk.startsWith("135")||mk.startsWith("136")){
  15. return 0;//part-r-00000
  16. }else if(mk.startsWith("137")||mk.startsWith("138")||mk.startsWith("139")){
  17. return 1;//part-r-00001
  18. }else{
  19. return 2;//part-r-00002
  20. }
  21. }
  22. }

报错

Illegal partition for 15013685858 (3)

分区个数说明:

  • reducetask的个数为1的时候,所有的分区的数据默认全部到一个reducetask中
  • reducetask的个数大于1, 按照分区编号对应的分区到对应的reducetask中
  • 设置的时候reducetask的个数=分区个数
  1. 分区编号值设置为3----说明reducetask-3
  2. 1reducetask0=分区0
  3. 2reducetask1=分区1
  4. 3reducetask2=分区2
  5. 报错:
  6. Illegal partition for 15013685858 (3)
  7. 原因:分区和reducetask的个数不匹配
  8. 注意:设定分区编号的时候,最好顺序递增的,不要跳数(从0开始)
  9. 分区个数3个(getPartition方法返回的个数):reducetask的个数(job.setNumReduceTasks可以设置:
  10. 1) 1 === 可以:对应一个输出结果
  11. 2) 2 === 不可以:
  12. 3) 3 可以
  13. 4) >3 可以

数据倾斜:分区不合理

多个reducetask并行计算的时候,某一个reducetask分配数据不均匀,这个时候产生现象数据倾斜。
后果:reducetask的整体运行的性能低

  • 数据倾斜一定需要尽量避免的
  • 原因:分区算法没有合理的分配均匀数据
  1. 假设有100reducetask
  2. 其中99reducetask--1T 需要10min
  3. 剩下1reducetask---100T1000min
  4. 那么进度:
  5. map 100% reduce 99%
  6. map 100% reduce 99%
  7. map 100% reduce 99%
  8. map 100% reduce 99%
  9. map 100% reduce 99%
  10. map 100% reduce 99%
  11. map 100% reduce 99%
  12. map 100% reduce 99%
  13. ...............
  14. 大数据中不怕数据量大 怕数据倾斜

解决

合理设计分区算法,根据实际的数据抽样做测试,进行合理设计分区,业务,数据。

比如购物的的时候,西藏等地方的数据量就会少于北上广等地的数据。

reducetask数据倾斜的原因

  • mapreduce阶段:

    • maptask:1个切片默认128M—-1maptask

      1. maptask的并行度比较高,切片个数—-数据块的个数(最后一个切片有可能跨块的)
      2. maptask数据一个切片为128M,对应的数据量比较小。
    • reducetask:容易产生数据倾斜

      1. 并行度不高,job.setNumReduceTasks()

        经验值:**reducetask最大值=datanode0.95 即100个节点设置reducetask的个数为95。

  1. 2.

reducetask的数据分配取决于分区算法

  1. 3.

reducetask的对应的数据量本身比较大

combiner组件

  • 这个组件不适用所有的场景的

  • 优化组件:提升性能的作用 ,这个组件默认不加的

  • 作用:对到reducetask之前的数据做预处理:

    • 帮助reducetask做预处理,减少reducetask的数据量,提升性能

组件怎么加?

  • 这个组件就是提前帮助reducetask做一些自己的工作,减轻redcetask的压力.
  • combiner和reducetask的业务逻辑一样
  • 作用的时间点在maptask之后,reducetask之前

实现:这个组件不会影响业务逻辑和reducetask的实现是一样

  1. 继承Reducer类
  2. 重写reduce方法
  3. job中设置添加combiner组件
  1. //实现和reducetask的实现一样
  2. //实际的开发过程直接使用reducer的类作为combiner的类
  3. job.setCombinerClass(MyCombiner.class);

场景

可以适用

  1. 求和
  2. 求最大值
  3. 求最小值

不适用:

  1. 求平均值(这是是相当于平均值再取平均值)

例子

流量汇总统计结果按照手机归属地不同省份输出到不同文件中

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class FlowPartition {
  13. //key--手机号 value---其他
  14. static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
  15. Text mk=new Text();
  16. Text mv=new Text();
  17. @Override
  18. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
  19. throws IOException, InterruptedException {
  20. String[] datas = value.toString().split("\t");
  21. mk.set(datas[0]);
  22. mv.set(datas[1]+"\t"+datas[2]+"\t"+datas[3]);
  23. context.write(mk, mv);
  24. }
  25. }
  26. static class MyReducer extends Reducer<Text, Text, Text, Text>{
  27. //分组 按照手机号分组的
  28. @Override
  29. protected void reduce(Text key, Iterable<Text> values,
  30. Reducer<Text, Text, Text, Text>.Context context)
  31. throws IOException, InterruptedException {
  32. for(Text v:values){
  33. context.write(key, v);
  34. }
  35. }
  36. }
  37. public static void main(String[] args) {
  38. System.setProperty("HADOOP_USER_NAME", "hadoop");
  39. //将mapper reducer类进行一个封装 封装为一个任务----job(作业)
  40. //加载配置文件
  41. Configuration conf=new Configuration();
  42. //启动一个Job 创建一个job对象
  43. try {
  44. Job job=Job.getInstance(conf);
  45. //设置这个job
  46. //设置整个job的主函数入口
  47. job.setJarByClass(FlowPartition.class);
  48. //设置job的mappper的类
  49. job.setMapperClass(MyMapper.class);
  50. //设置job的reducer的类
  51. job.setReducerClass(MyReducer.class);
  52. //如果map输出的key value的类型 和reduce输出的key value的类型相同 这个时候只设置最终输出
  53. //设置reduce的输出的k v类型 以下方法设置的是mr的最终输出
  54. //这个设置实际上 map和reduce的输出
  55. //如果输出的类型不一致 一定分开设置
  56. job.setOutputKeyClass(Text.class);
  57. job.setOutputValueClass(Text.class);
  58. //指定自定义分区
  59. job.setPartitionerClass(MyPartitioner.class);
  60. //指定reducetask的个数
  61. job.setNumReduceTasks(2);
  62. //指定需要统计的文件的输入路径 FileInputFormat 文件输入类
  63. Path inpath=new Path("hdfs://hadoop01:9000/flow_out01");
  64. FileInputFormat.addInputPath(job, inpath);
  65. //指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失
  66. Path outpath=new Path("hdfs://hadoop01:9000/flow_partition_out05");
  67. FileOutputFormat.setOutputPath(job, outpath);
  68. //提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job
  69. //job.submit();
  70. job.waitForCompletion(true);
  71. } catch (Exception e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }