1. MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,它将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

2. MapReduce特点

2.1 优点

MapReduce 具有如下优点

  • 易于编程实现
  • 具有良好的扩展性
  • 高容错性
  • 适合PB级以上海量数据的离线处理
  • 易于编程

开发人员仅需要实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。

  • 良好的扩展性

当计算资源不能得到满足时,可以通过简单的增加机器来扩展它的计算能力。

  • 高容错性

MapReduce 设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。如果其中一台机器宕机,MapReduce 可以把宕机机器上面的任务转移到另一个节点上运行,从而避免整个任务的失败。 整个容错的任务转移是由hadoop内部完成的,无需进行人工干预。

  • 适合海量数据的离线处理

可以支持上千台服务器集群进行分布式计算,当数据量足够大时才能体现MapReduce 的优势。

2.2 缺点

不擅长实时计算: 无法在毫秒级别的延时内返回结果 不擅长流式计算: MapReduce 输入的数据是静态的,不能动态变化 不擅长DAG计算: 多个应用程序存在依赖关系时,MapReduce需要将前一个应用的输出写入磁盘,后一个应用程序需要读取磁盘数据才能继续执行,存在磁盘IO效率问题。

3. MapReduce工作原理

MapReduce 进行分布式运算时,往往需要分成至少2个阶段: Map阶段 和 Reduce 阶段。分队对应 MapTask 和 ReduceTask。

其中第一阶段的MapTask各个实例进行互不干扰的并发运行,然后在第二阶段的ReduceTask中各个计算实例的ReduceTask 还是在并发的执行,但是ReduceTask需要依赖上一个阶段MapTask的并发计算输出。MapReduce 编程模型中只能包含一个Map阶段和Reduce 阶段,如果业务逻辑非常复杂,则需要编写多个MapReduce 程序,串行运行。

4. MapReduce 程序设计

每个MapReduce 程序设计主要涉及三部分功能

map部分: 根据输入和输出的要求,进行数据格式的转换 reduce部分: 将map结果进行reduce操作得到下一阶段需要的输入 driver部分: 聚合map 和 reduce 操作,设置输入输出的K-V类型及输入输出文件路径


客户端代码需要引入如下依赖

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>3.1.3</version>
  5. </dependency>

4.1 Mapper 实现

自定义Mapper 实现类需要继承hadoop-client 包提供的 Mapper抽象类,并覆写其中的map方法,若不实现该map方法,默认情况写mapper 与输入保持不变。

Mapper 抽象类对应泛型含义如下:

  1. /**
  2. * KEYIN: 输入 key类型
  3. * VALUEIN: 输入 value 类型
  4. * KEYOUT: 输出 key 类型
  5. * VALUEOUT: 输出 value 类型
  6. */
  7. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  8. //
  9. public abstract class Context
  10. implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  11. }
  12. // 设置方法,任务开始时只执行一次
  13. protected void setup(Context context
  14. ) throws IOException, InterruptedException {
  15. // NOTHING
  16. }
  17. // map方法 每个k-v对都会执行一次,大多数应用都需要覆盖该方法
  18. // 默认情况下,该方法是一致性的方法
  19. @SuppressWarnings("unchecked")
  20. protected void map(KEYIN key, VALUEIN value,
  21. Context context) throws IOException, InterruptedException {
  22. context.write((KEYOUT) key, (VALUEOUT) value);
  23. }
  24. // 清理方法在任务执行后执行一次
  25. protected void cleanup(Context context
  26. ) throws IOException, InterruptedException {
  27. // NOTHING
  28. }
  29. // 用户能在需要对Mapper进行更加完整的控制时 覆写该方法
  30. public void run(Context context) throws IOException, InterruptedException {
  31. setup(context);
  32. try {
  33. while (context.nextKeyValue()) {
  34. map(context.getCurrentKey(), context.getCurrentValue(), context);
  35. }
  36. } finally {
  37. cleanup(context);
  38. }
  39. }
  40. }

4.2 Reduce 实现

Reduce 任务的定义类似 Map任务,需要继承 Reduce类,并覆写reduce 方法实现Reduce任务的逻辑。

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  // reduce方法对每组key 进行一次调用
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}

4.3 Driver 实现

Mapper 和 Reduce 任务定义好之后,需要在Driver程序中关联起来执行,并提供输入,输出的 k-v 类型设置及输入输出文件路径设置。 Driver 中主要包含的如下部分的内容

  • 创建执行的job
  • 设置当前jar包路径
  • 关联mapper 和 reducer
  • 设置mapper 输出的k-v类型
  • 设置最终输出的k-v类型
  • 设置输入路径和输出路径
  • 提交job

4.4 WordCount实例

// wordcount 输入输出文件路径从命令行参数获取
public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 创建执行的job 
        Configuration entries = new Configuration();
        Job job = Job.getInstance(entries);
        // 设置当前jar包路径
        job.setJarByClass(WordCountDriver.class);
        // 设置当前jar包路径
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置mapper 输出的k-v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置最终输出的k-v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交job 
        boolean result = job.waitForCompletion(Boolean.TRUE);
        System.exit(result? 0: 1);
    }
}


public class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
    // map 方法对每对 k-v 执行一次, 定义为全局属性可以减少内存消耗
    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\W");

        for(String word : words) {
            if(StringUtils.isBlank(word)) continue;
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}


public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    // 由于reduce 方法多次执行,定义全局属性可以减少内存消耗
    private IntWritable outValue = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable cnt : values){
            sum += cnt.get();
        }
        outValue.set(sum);
        context.write(key,outValue);
    }
}