MapReduce是什么

  • MapReduce 是一种编程模型,是面向大数据并行处理的计算模型、框架和平台。
  • MapReduce 是一个基于集群的高性能并行计算平台。可以使用普通服务器构成一个包含数十、 数百、甚至数千个节点的分布式和并行计算集群。
  • MapReduce 是一个并行计算与运行的软件框架。它提供了一个庞大但设计精良的并行计算软 件框架,能自动划分计算数据和计算任务,自动完成计算任务的并行化处理,实现在集群节 点上自动分配和执行任务并收集计算结果,将数据分布存储、数据通信、容错处理等并行计 算涉及到的很多系统底层的复杂实现细节交由系统负责处理,大大减少了软件开发人员的负 担。
  • MapReduce 是一个并行程序设计模型与方法。它提供了一种简便的并行程序设计方法,用 Map 和 Reduce 两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以 简单方便地完成大规模数据的编程和计算处理。

MapReduce运行流程

MapReduce主要功能

  1. - 数据划分和计算任务调度
  2. - 数据/代码互相定位
  3. - 系统优化
  4. - 出错检测和恢复

运行流程
image.png
由上图可以看到 MapReduce 执行下来主要包含这样几个步骤:
1) 首先正式提交作业代码,并对输入数据源进行切片
2) master 调度 worker 执行 map 任务
3) worker 读取输入源切片
4) worker 执行 map 任务,将任务输出保存在本地
5) master 调度 worker 执行 reduce 任务,reduce worker 读取 map 任务的输出文件
6) 执行 reduce 任务,将任务输出保存到 HDFS

运行流程详解

以 WordCount 为例
给定任意的 HDFS 的输入目录,其内部数据为“f a c d e……”等用空格字符分隔的字符串,
通过使用 MapReduce 计算框架来统计以空格分隔的每个单词出现的频率,输出结果如
,,形式的结果到 HDFS 目录中。
MapReduce 将作业的整个运行过程分为两个阶段:Map 阶段 Reduce 阶段。
Map 阶段由一定数量的 Map Task 组成,流程如下:

  • 输入数据格式解析:InputFormat
  • 输入数据处理:Mapper
  • 数据分组:Partitioner
  • 数据按照 key 排序
  • 本地规约:Combiner(相当于 local reducer,可选)
  • 将任务输出保存在本地

Reduce 阶段由一定数量的 Reduce Task 组成,流程如下:

  • 数据远程拷贝
  • 数据按照 key 排序
  • 数据处理:Reducer
  • 数据输出格式:OutputFormat

通常我们把从 Mapper 输出数据到 Reduce 读取数据之间的过程称之为 shuffle

MapReduce Java API 的应用

MapReduce开发流程

  • 搭建开发环境,参考 HDFS 环境搭建,基本一致
  • 基于 MapReduce 框架编写代码,Map、Reduce、Driver 三部分组成。
  • 编译打包,将源代码和依赖 jar 包打成一个包
  • 上传至运行环境
  • 运行 hadoop jar 命令,现已由 yarn jar 替代,建议使用新命令提交执行

    1. 具体提交命令为:
    2. yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount
    3. /tmp/tianliangedu/input /tmp/tianliangedu/output3
  • 通过 yarn web ui 查看执行过程

  • 查看执行结果

WordCount代码实现

Map类编写

  • Mapper:是 MapReduce 计算框架中 Map 过程的封装
  • Text:Hadoop 对 Java String 类的封装,适用于 Hadoop 对文本字符串的处理
  • IntWritable:Hadoop 对 Java Integer 类的封装,适用于 Hadoop 整型的处理
  • Context:Hadoop 环境基于上下文的操作对象,如 Map 中 key/value 的输出、分布式缓存数 据、分布式参数传递等
  • StringTokenizer:对 String 对象字符串的操作类,做基于空白字符的切分操作工具类
  • 源码编写实现:
    1. package com.tianliangedu.mapper;
    2. import java.io.IOException;
    3. import java.util.StringTokenizer;
    4. import org.apache.hadoop.io.IntWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Mapper;
    7. public class MyTokenizerMapper extends
    8. Mapper<Object, Text, Text, IntWritable> {
    9. // 暂存每个传过来的词频计数,均为 1,省掉重复申请空间
    10. private final static IntWritable one = new IntWritable(1);
    11. // 暂存每个传过来的词的值,省掉重复申请空间
    12. private Text word = new Text();
    13. // 核心 map 方法的具体实现,逐个<key,value>对去处理
    14. public void map(Object key, Text value, Context context)
    15. throws IOException, InterruptedException {
    16. // 用每行的字符串值初始化 StringTokenizer
    17. StringTokenizer itr = new StringTokenizer(value.toString());
    18. // 循环取得每个空白符分隔出来的每个元素
    19. while (itr.hasMoreTokens()) {
    20. // 将取得出的每个元素放到 word Text 对象中
    21. word.set(itr.nextToken());
    22. // 通过 context 对象,将 map 的输出逐个输出
    23. context.write(word, one);
    24. }
    25. } }

Reduce类编写

  • Reducer:是 MapReduce 计算框架中 Reduce 过程的封装
  • 源码编写实现:
    1. package com.tianliangedu.reducer;
    2. import java.io.IOException;
    3. import org.apache.hadoop.io.IntWritable;
    4. import org.apache.hadoop.io.Text;
    5. import org.apache.hadoop.mapreduce.Reducer;
    6. //reduce 类,实现 reduce 函数
    7. public class IntSumReducer extends
    8. Reducer<Text, IntWritable, Text, IntWritable> {
    9. private IntWritable result = new IntWritable();
    10. //核心 reduce 方法的具体实现,逐个<key,List(v1,v2)>去处理
    11. public void reduce(Text key, Iterable<IntWritable> values,
    12. Context context) throws IOException, InterruptedException {
    13. //暂存每个 key 组中计算总和
    14. int sum = 0;
    15. //加强型 for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
    16. for (IntWritable val : values) {
    17. //将 key 组中的每个词频数值 sum 到一起
    18. sum += val.get();
    19. }
    20. //将该 key 组 sum 完成的值放到 result IntWritable 中,使可以序列化输出
    21. result.set(sum);
    22. //将计算结果逐条输出
    23. context.write(key, result);
    24. }
    25. }

Driver类编写
Configuration:与 HDFS 中的 Configuration 一致,负责参数的加载和传递
Job:作业,是对一轮 MapReduce 任务的抽象,即一个 MapReduce 的执行全过程的管理类
FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径
FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
源码编写实现:

  1. package com.tianliangedu.driver;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import com.tianliangedu.mapper.MyTokenizerMapper;
  10. import com.tianliangedu.reducer.IntSumReducer;
  11. public class WordCountDriver {
  12. // 启动 mr 的 driver 方法
  13. public static void main(String[] args) throws Exception {
  14. // 得到集群配置参数
  15. Configuration conf = new Configuration();
  16. // 设置到本次的 job 实例中
  17. Job job = Job.getInstance(conf, "天亮 WordCount");
  18. // 指定本次执行的主类是 WordCount
  19. job.setJarByClass(WordCountDriver.class);
  20. // 指定 map 类
  21. job.setMapperClass(MyTokenizerMapper.class);
  22. // 指定 combiner 类,要么不指定,如果指定,一般与 reducer 类相同
  23. job.setCombinerClass(IntSumReducer.class);
  24. // 指定 reducer 类
  25. job.setReducerClass(IntSumReducer.class);
  26. // 指定 job 输出的 key 和 value 的类型,如果 map 和 reduce 输出类型不完全相同,需要重
  27. 新设置 map output key value class 类型
  28. job.setOutputKeyClass(Text.class);
  29. job.setOutputValueClass(IntWritable.class);
  30. // 指定输入数据的路径
  31. FileInputFormat.addInputPath(job, new Path(args[0]));
  32. // 指定输出路径,并要求该输出路径一定是不存在的
  33. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  34. // 指定 job 执行模式,等待任务执行完成后,提交任务的客户端才会退出!
  35. System.exit(job.waitForCompletion(true) ? 0 : 1);
  36. } }

Maven打包
上传到运行环境
运行WordCount程序
查看执行过程
查看执行结果

标准代码实现

将之前的三个类,合并成一个类来处理

  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. //启动 mr 的 driver 类
  13. public class WordCountDriver {
  14. //map 类,实现 map 函数
  15. public static class MyTokenizerMapper extends
  16. Mapper<Object, Text, Text, IntWritable> {
  17. //暂存每个传过来的词频计数,均为 1,省掉重复申请空间
  18. private final static IntWritable one = new IntWritable(1);
  19. //暂存每个传过来的词的值,省掉重复申请空间
  20. private Text word = new Text();
  21. //核心 map 方法的具体实现,逐个<key,value>对去处理
  22. public void map(Object key, Text value, Context context)
  23. throws IOException, InterruptedException {
  24. //用每行的字符串值初始化 StringTokenizer
  25. StringTokenizer itr = new StringTokenizer(value.toString());
  26. //循环取得每个空白符分隔出来的每个元素
  27. while (itr.hasMoreTokens()) {
  28. //将取得出的每个元素放到 word Text 对象中
  29. word.set(itr.nextToken());
  30. //通过 context 对象,将 map 的输出逐个输出
  31. context.write(word, one);
  32. }
  33. }
  34. }
  35. //reduce 类,实现 reduce 函数
  36. public static class IntSumReducer extends
  37. Reducer<Text, IntWritable, Text, IntWritable> {
  38. private IntWritable result = new IntWritable();
  39. //核心 reduce 方法的具体实现,逐个<key,List(v1,v2)>去处理
  40. public void reduce(Text key, Iterable<IntWritable> values,
  41. Context context) throws IOException, InterruptedException {
  42. //暂存每个 key 组中计算总和
  43. int sum = 0;
  44. //加强型 for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
  45. for (IntWritable val : values) {
  46. //将 key 组中的每个词频数值 sum 到一起
  47. sum += val.get();
  48. }
  49. //将该 key 组 sum 完成的值放到 result IntWritable 中,使可以序列化输出
  50. result.set(sum);
  51. //将计算结果逐条输出
  52. context.write(key, result);
  53. }
  54. }
  55. //启动 mr 的 driver 方法
  56. public static void main(String[] args) throws Exception {
  57. //得到集群配置参数
  58. Configuration conf = new Configuration();
  59. //设置到本次的 job 实例中
  60. Job job = Job.getInstance(conf, "天亮 WordCount");
  61. //通过指定相关字节码对象,找到所属的主 jar 包
  62. job.setJarByClass(WordCountDriver.class);
  63. //指定 map 类
  64. job.setMapperClass(MyTokenizerMapper.class);
  65. //指定 combiner 类,要么不指定,如果指定,一般与 reducer 类相同
  66. job.setCombinerClass(IntSumReducer.class);
  67. //指定 reducer 类
  68. job.setReducerClass(IntSumReducer.class);
  69. //指定 job 输出的 key 和 value 的类型,如果 map 和 reduce 输出类型不完全相同,需
  70. 要重新设置 map output key value class 类型
  71. job.setOutputKeyClass(Text.class);
  72. job.setOutputValueClass(IntWritable.class);
  73. //指定输入数据的路径
  74. FileInputFormat.addInputPath(job, new Path(args[0]));
  75. //指定输出路径,并要求该输出路径一定是不存在的
  76. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  77. //指定 job 执行模式,等待任务执行完成后,提交任务的客户端才会退出!
  78. System.exit(job.waitForCompletion(true) ? 0 : 1);
  79. } }

MapReduce Shell应用

  • 查看当前正在执行的job任务
    • mapred job -list查看任务列表
  • 终止一个任务的执行
    • mapred job -kill job-id
  • 查看一个job的日志
    • mapred job -logs job-id

MapReduce技术特征

  • 向”外”横向扩展,而非向”上”纵向发展
  • 失效被认为是常态
  • 移动计算,把处理向数据迁移(数据本地性)
  • 顺序处理计算、避免随机访问数据
  • 推测执行
  • 可扩展性
  • 为应用开发隐藏系统底层细节