image.png

增加Reduce(分区数)提高效率

在实际工作中,如果我们想提高MapReduce的执行效率,最直接的方法是什么呢?
我们知道MapReduce是分为Map阶段和Reduce阶段,其实提高执行效率就是提高这两个阶段的执行效率
默认情况下Map阶段中Map任务的个数是和数据的InputSplit相关的,InputSplit的个数一般是和Block块 是有关联的,所以可以认为Map任务的个数和数据的block块个数有关系,针对Map任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的。

默认情况下reduce的个数是1个,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数。
对数据进行分区可以增加Reduce的个数,分区之后,每一个分区的数据会被一个reduce任务处理。

那如何增加分区呢?
可以通过 job.setPartitionerClass 来设置分区类,不过目前我们是没有设置的,那框架中是不 是有默认值啊,是有的,我们可以通过 job.getPartitionerClass 方法看到默认情况下会使用 HashPartitioner 这个分区类,那我们来看一下HashPartitioner的实现是什么样子的

  1. public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  2. public void configure(JobConf job) {}
  3. /** Use {@link Object#hashCode()} to partition. */
  4. public int getPartition(K2 key, V2 value,
  5. int numReduceTasks) {
  6. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  7. }
  8. }

map里面每一条数据都会进入这个方法来获取他们所在的分区信息,这里的key就是k2,value就是v2
其实起决定性的因素就是 numReduceTasks 的值,这个值默认是1,通过 job.getNumReduceTasks() 可知。
所以最终任何值%1 都返回0,那也就意味着他们都在0号分区,也就只有这一个分区。

如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可,这个其实就是reduce任务的数 量,那也就意味着,只要redcue任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个reduce任务,那我们就不需要单独增加分区的数量了,只需要控制好Redcue任务的数量即可。

数据倾斜

增加redcue任务个数在一定场景下是可以提高效率的,但是在一些特殊场景下单纯增加reduce任务个数是无法达到质的提升的。

假设我们有一个文件,有1000W条数据,这里面的值主要都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数其实在私底下我们是知道这份数据的大致情况的,这里面这1000w条数据,值为5的数据有910w条左 右,剩下的9个数字一共只有90w条,那也就意味着,这份数据中,值为5的数据比较集中,或者说值为5的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。

下面我们画图来具体分析一下:
假设这1000W条数据的文件有3个block,会产生3个InputSplt,最终会产生3个Map任务,默认情况下只 有一个reduce任务,所以所有的数据都会让这一个reduce任务处理,这样这个Reduce压力肯定很大,大量的时间都消耗在了这里
image.png
那根据我们前面的分析,我们可以增加reduce任务的数量,看下面这张图,我们把reduce任务的数量调 整到10个,这个时候就会把1000w条数据让这10 个reduce任务并行处理了,这个时候效率肯定会有一定的提升,但是最后我们会发现,性能提升是有限的,并没有达到质的提升,那这是为什么呢?
image.png
我们来分析一下,刚才我们说了我们这份数据中,值为5的数据有910w条,这就占了整份数据的90%
了,那这90%的数据会被一个reduce任务处理,在这里假设是让reduce5处理了,reduce5这个任务执行 的是比较慢的,其他reduce任务都执行结束很长时间了,它还没执行结束,因为reduce5中处理的数据 量和其他reduce中处理的数据量规模相差太大了,所以最终reduce5拖了后腿。咱们mapreduce任务执行消耗的时间是一直统计到最后一个执行结束的reduce任务,所以就算其他reduce任务早都执行结束了也没有用,整个mapreduce任务是没有执行结束的。

生成1000w条测试数据:1~10,值为5的数据有910w条,剩下的9个数字一共只有90w条

  1. public static void main(String[] args) throws IOException {
  2. FileWriter myWriter = new FileWriter("D:\\test\\hello_10000000.dat");
  3. for (int i = 0; i < 100000; i++) {
  4. myWriter.write(1 + "\n");
  5. myWriter.write(2 + "\n");
  6. myWriter.write(3 + "\n");
  7. myWriter.write(4 + "\n");
  8. myWriter.write(6 + "\n");
  9. myWriter.write(7 + "\n");
  10. myWriter.write(8 + "\n");
  11. myWriter.write(9 + "\n");
  12. myWriter.write(10 + "\n");
  13. }
  14. for (int i = 0; i < 9100000; i++) {
  15. myWriter.write(5 + "\n");
  16. }
  17. myWriter.close();
  18. System.out.println("成功写入文件");
  19. }

解决办法

这个时候单纯的增加reduce任务的个数已经不起多大作用了,如果启动太多可能还会适得其反。
最好的办法是把这个值为5的数据尽量打散,把这个倾斜的数据分配到其他reduce任务中去计算,这样才能从根本上解决问题。

MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理时间变得很长具体表现为:Ruduce阶段一直卡着不动根据刚才的分析,有两种方案:

增加reduce任务个数

这个属于治标不治本,针对倾斜不是太严重的数据是可以解决问题的,针对倾斜严重的数据,这样是解决不了根本问题的

把倾斜的数据打散

这种可以根治倾斜严重的数据。

测试

1个Reduce

  1. /**
  2. * 数据倾斜-增加Reduce任务个数
  3. */
  4. public class WordCountJobSkew {
  5. /**
  6. * Map阶段
  7. */
  8. public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  9. @Override
  10. protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
  11. //对获取到的每一行数据进行切割,把单词切割出来
  12. String[] words = v1.toString().split(" ");
  13. //把迭代出来的单词封装成<k2,v2>的形式
  14. Text k2 = new Text(words[0]);//只取前面的数字,后面的字符串不取
  15. LongWritable v2 = new LongWritable(1L);
  16. //把<k2,v2>写出去
  17. context.write(k2, v2);
  18. }
  19. }
  20. /**
  21. * Reduce阶段
  22. */
  23. public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  24. @Override
  25. protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
  26. //创建一个sum变量,保存v2s的和
  27. long sum = 0L;
  28. //对v2s中的数据进行累加求和
  29. for (LongWritable v2 : v2s) {
  30. sum += v2.get();
  31. //模拟Reduce的复杂计算消耗的时间
  32. if (sum % 200 == 0) {
  33. Thread.sleep(1);
  34. }
  35. }
  36. //组装k3,v3
  37. Text k3 = k2;
  38. LongWritable v3 = new LongWritable(sum);
  39. // 把结果写出去
  40. context.write(k3, v3);
  41. }
  42. }
  43. /**
  44. * 组装Job=Map+Reduce
  45. * 需要3个参数:输入路径、输出路径、Reduce任务个数
  46. */
  47. public static void main(String[] args) {
  48. try {
  49. if (args.length != 3) {
  50. //如果传递的参数不够,程序直接退出
  51. System.exit(100);
  52. }
  53. //指定Job需要的配置参数
  54. Configuration conf = new Configuration();
  55. //创建一个Job
  56. Job job = Job.getInstance(conf);
  57. //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
  58. job.setJarByClass(WordCountJobSkew.class);
  59. //指定输入路径(可以是文件,也可以是目录)
  60. FileInputFormat.setInputPaths(job, new Path(args[0]));
  61. //指定输出路径(只能指定一个不存在的目录)
  62. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  63. //指定map相关的代码
  64. job.setMapperClass(MyMapper.class);
  65. //指定k2的类型
  66. job.setMapOutputKeyClass(Text.class);
  67. //指定v2的类型
  68. job.setMapOutputValueClass(LongWritable.class);
  69. //指定reduce相关的代码
  70. job.setReducerClass(MyReducer.class);
  71. //指定k3的类型
  72. job.setOutputKeyClass(Text.class);
  73. //指定v3的类型
  74. job.setOutputValueClass(LongWritable.class);
  75. //设置Reduce任务个数
  76. job.setNumReduceTasks(Integer.parseInt(args[2]));
  77. //提交job
  78. job.waitForCompletion(true);
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. }
  1. hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSkew /hello_10000000.dat /out10000000 1

image.png
image.png

10个Reduce

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSkew /hello_10000000.dat /out10000000_2 10

image.png
image.png

查看耗时情况:可以看到有一个Reduce(5)耗时很长
image.png

10个Reduce且数据打散

/**
 * 数据倾斜-把倾斜的数据打散
 */
public class WordCountJobSkewRandKey {
    /**
     * Map阶段
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Random random = new Random();

        /**
         * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //把迭代出来的单词封装成<k2,v2>的形式
            String key = words[0];
            if ("5".equals(key)) {
                //把倾斜的key打散,分成10份
                key = "5" + "_" + random.nextInt(10);
            }
            Text k2 = new Text(key);
            LongWritable v2 = new LongWritable(1L);
            //把<k2,v2>写出去
            context.write(k2, v2);
        }
    }


    /**
     * Reduce阶段
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        /**
         * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的和
            long sum = 0L;
            //对v2s中的数据进行累加求和
            for (LongWritable v2 : v2s) {

                sum += v2.get();
                //模拟Reduce的复杂计算消耗的时间
                if (sum % 200 == 0) {
                    Thread.sleep(1);
                }
            }

            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);

            // 把结果写出去
            context.write(k3, v3);
        }
    }

    /**
     * 组装Job=Map+Reduce
     */
    public static void main(String[] args) {
        try {
            if (args.length != 3) {
                //如果传递的参数不够,程序直接退出
                System.exit(100);
            }

            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个Job
            Job job = Job.getInstance(conf);

            //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
            job.setJarByClass(WordCountJobSkewRandKey.class);

            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);


            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);

            //设置Reduce任务个数
            job.setNumReduceTasks(Integer.parseInt(args[2]));

            //提交job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSkewRandKey /hello_10000000.dat /out10000000_3 10

image.png
image.png

但是这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工 其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个mapreduce任务再做一次全局聚合, 其实也很简单,获取到上一个map任务的输出,在map端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为5的数据还原出来了 也就是在步做处理 ,输出<5,count>而不是<5,1>

查看耗时情况:可以看到每个Reduce耗时接近
image.png