3. MapReduce

3.1 MapReduce介绍

MapReduce的核心思想是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。

  • Map负责“分”,即把大规模的问题分割为多个小规模的问题进行计算。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系;
  • Reduce负责“合”,即对map阶段的结果进行全局汇总;

这两个阶段合起来正是MapReduce思想的体现,如下图所示:

3. MapReduce - 图1

还有一个比较形象的例子解释MapReduce,假设需要要数图书馆中的所有书:

  • 同学A数1号书架,同学B数2号书架,这就是“Map”,人越多,数书就更快;
  • 然后把所有同学统计的书的数量加在一起。这就是“Reduce”。

最后要注意的是,MapReduce是运行在YARN集群的,主要和以下两个组件有关:

  1. ResourceManager
  2. NodeManager

具体的内容会在之后YARN部分的学习中介绍。

3.2 MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上

MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:

Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现Map和Reduce,MapReduce处理的数据类型是键值对

  • Map: (k1; v1) → [(k2; v2)]
  • Reduce: (k2; [v2]) → [(k3; v3)]

3. MapReduce - 图2

一个完整的mapreduce程序在分布式运行时有三类实例进程

  1. MRAppMaster 负责整个程序的过程调度及状态协调
  2. MapTask 负责map阶段的整个数据处理流程
  3. ReduceTask 负责reduce阶段的整个数据处理流程

3. MapReduce - 图3

总结一下MapReduce:

  • 它是一种分布式计算模型,用于解决海量数据的计算问题;
  • MapReduce将整个并行计算过程抽象到两个函数
    • Map:对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行
    • Reduce:对一个列表的元素进行合并。
  • 一个简单的MapReduce程序只需要指定map(),reduce(),input和output,其余由框架完成。

3.3 MapReduce编程规范

MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段分 4 个步骤, Reduce 阶段分2个步骤。

  1. Map阶段的两个步骤:
    • 设置 InputFormat 类, 将数据切分为 Key-Value <K1, V1> 对,输入到第二步;
    • 自定义 Map 逻辑,将第一步的结果转换成另外的 Key-Value <K2, V2>对,输出结果。
  2. Shuffle阶段的四个步骤:
    • 对输出的 Key-Value 对进行分区
    • 对不同分区的数据按照相同的 Key 进行排序
    • (可选) 对分组过的数据初步规约,降低数据的网络拷贝;
    • 对数据进行分组,相同 Key 的 Value 放入一个集合中。
  3. Reduce阶段的两个步骤:
    • 对多个 Map 任务的结果进行排序以及合并,编写 Reduce 函数实现自己的逻辑,对输入的Key-Value 进行处理,转为新的 Key-Value <K3, V3> 输出;
    • 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据。

【注意】:Shuffle中相同分区的数据会到同一个reduce。

3.3.1 WordCount案例

  • 准备数据
    1. 创建一个新的文件
      1. cd /export/servers
      2. vim wordcount.txt
  1. 向其中放入以下内容并保存
    hello,world,hadoop
    hive,sqoop,flume,hello
    kitty,tom,jerry,world
    hadoop
    
  1. 上传到 HDFS
    hdfs dfs -mkdir /wordcount/
    hdfs dfs -put wordcount.txt /wordcount/
    
  • 编写程序
    • Mapper ```java import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// 这里注意导包不要导错了,一定要导hadoop.io这个包 public class WordCountMapper extends Mapper { // Map方法就是将转化成 / 参数: key:K1 行偏移量 value:V1 每一行的文本数据 context:上下文对象 / / 如何将转换成 <行偏移量, 文本字符串> <单词, 数量> / @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.将一行的文本进行拆分 String[] split = value.toString().split(“,”); Text text = new Text(); LongWritable longWritable = new LongWritable(); // 2.遍历数组,构建 for (String word : split) { // 3.将写入上下文 text.set(word); longWritable.set(1); context.write(text, longWritable); } } }



   -  Reducer 
```java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    // reduce方法作用:将新的<K2,V2>转为<K3, V3>,将K3和V3写入上下文
    /*

      参数:
      key:新K2
      values:集合 新V2
      context:上下文对象

      ----------------------------

      如何将新的K2,V2转化成K3,V3?
      <单词, <1, 1, ...>>
      <单词, 数字>

    */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        // 1.遍历集合,将集合中的数字相加,得到V3
        for (LongWritable value : values) {
            count += value.get();
        }
        // 2.将K3和V3写入上下文中
        context.write(key, new LongWritable(count));
    }
}
  • 定义主类 ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool { // 该方法用于指定一个job任务 @Override public int run(String[] strings) throws Exception { // 1.创建一个job任务对象 注意这里configuration必须是和main一样的,main的configuration保存在了extends的父类里 Job job = Job.getInstance(super.getConf(), “wordcount”); // jar包运行出错时,也可以在主程序中修改 job.setJarByClass(JobMain.class);

    // 2.配置job对象(八个步骤)
    // 第一步,指定读取文件的类(读取方式)
    job.setInputFormatClass(TextInputFormat.class);
    // 指定路径
    TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount"));

    // 第二步,指定Map阶段的处理方式
    job.setMapperClass(WordCountMapper.class);
    // 设置Map阶段K2的类型
    job.setMapOutputKeyClass(Text.class);
    // 设置Map阶段V2的类型
    job.setMapOutputValueClass(LongWritable.class);

    // 第三、四、五、六步,shuffle阶段,采用默认方式

    // 第七步,指定Reduce阶段的处理方式
    job.setReducerClass(WordCountReducer.class);
    // 设置K3,V3的类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    // 第八步,设置输出类型
    job.setOutputFormatClass(TextOutputFormat.class);
    // 设置输出的路径
    // 不考虑路径存在问题的解法
    // TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.212.100:8020/wordcount_out"));

    // 考虑路径存在问题的解法
    Path path = new Path("hdfs://192.168.212.100:8020/wordcount_out");
    TextOutputFormat.setOutputPath(job, path);
    // 判断路径是否存在
    // 获取FileSystem
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.212.100:8020"), new Configuration());
    // 判断目录是否存在
    boolean bl2 = fileSystem.exists(path);
    if(bl2){
        // 删除目标目录
        fileSystem.delete(path,true);
    }

    // 等待任务结束
    boolean bl = job.waitForCompletion(true);

    return bl? 0: 1;
}

public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    // jar包运行不了也可以用这种方法
    // configuration.set("mapreduce.job.jar", "wordcount.jar");
    // Job job = new Job(configuration, JobMain.class.getSimpleName());

    JobMain jobMain = new JobMain();
    // 启动job任务
    int run = ToolRunner.run(configuration, jobMain, args);
    System.exit(run);
}

}



<a name="40772aa9"></a>
## 3.4 MapReduce运行模式

<a name="0e1472d5"></a>
### 3.4.1 本地运行模式

- MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行;
- 处理的数据及输出结果可以在本地文件系统,也可以在hdfs上;
- 实现本地运行的步骤: 
   - 写一个程序,不要带集群的配置文件;
   - 本质是程序的 conf 中是否有 mapreduce.framework.name=local 以及 yarn.resourcemanager.hostname=local 参数;
- 本地模式非常便于进行业务逻辑的 Debug , 只要在IDEA中打断点即可;

<a name="e433feb1"></a>
### 3.4.2 集群运行模式

1.  将 MapReduce 程序提交给 Yarn 集群,分发到很多的节点上并发执行; 
2.  处理的数据和输出结果应该位于 HDFS 文件系统; 
3.  提交集群的实现步骤:将程序打成jar包,然后在集群的任意一个节点上用hadoop命令启动,格式如下:

hadoop jar 包名 主类路径



<a name="d5cd40ce"></a>
## 3.5 MapReduce分区

前面提到,**同一个分区的数据发送到同一个 Reduce 当中进行处理**。因此我们可以指定分区,将想要一起处理的数据放入一个分区。例如:为了数据的统计,可以**把一批类似的数据发送到同一个 Reduce 当中**,在同一个 Reduce 当中统计相同类型的数据,就可以实现类似的数据分区和统计等。其实就是**把相同类型的、有共性的数据送到一起去处理**。

Reduce中**默认的分区只有一个**:

![](MapReduce.assets/image-20210403101142499.png#crop=0&crop=0&crop=1&crop=1&id=RR8Zc&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)

编写程序的步骤如下:

1.  **定义Mapper**:这个 Mapper 程序不做任何逻辑,也不对 Key-Value 做任何改变,只是接收数据,然后往下发送 
```java
package partitioner;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
    K1:行偏移量 LongWritable
    V1:行文本数据 Text

    K2:行文本数据 Text
    V2:占位符 NullWritable
*/

public class PartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    // map 方法将K1、V1转为K2、V2
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 这里K2就是V1,V2是占位符,需要通过类型的get方法得到
        context.write(value, NullWritable.get());
    }
}
  1. 自定义Partitioner: 主要的逻辑就在这里,通过 Partitioner 将数据分发给不同的 Reducer ```java package partitioner;

import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;

public class Mypartitioner extends Partitioner { /*

    1. 定义分区规则
    2. 返回对应的分区编号
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
    // 1. 拆分文本数据(K2),获取中奖字段编号
    String[] split = text.toString().split("\t");
    String numStr = split[5];
    // 2. 判断中奖字段与15的关系,返回对应的分区编号
    if (Integer.parseInt(numStr) > 15){
        return 1;
    }
    else {
        return 0;
    }
}

}



3.  **定义Reducer逻辑**:这个 Reducer 也不做任何处理,将数据原封不动的输出即可 
```java
package partitioner;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
    K2: 一行的文本数据 Text
    V2: 占位符 NullWritable

    K3: 一行的文本数据 Text
    V3: 占位符 NullWritable
*/

public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
  1. 主类中设置分区类和ReduceTask个数 ```java package partitioner;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

@Override
public int run(String[] strings) throws Exception {
    // 1.创建job任务对象
    Job job = Job.getInstance(super.getConf(), "partition_mapreduce");
    // 2.对job任务进行配置(八个步骤)
    // 设置输入的路径
    job.setInputFormatClass(TextInputFormat.class);
    TextInputFormat.addInputPath(job, new Path("hdfs://192.168.212.100:8020/input"));
    // 设置mapper类,和对应的K2 V2类型
    job.setMapperClass(PartitionMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    // shuffle阶段(分区自定义,其他默认)
    job.setPartitionerClass(Mypartitioner.class);
    // 设置Reducer类和数据类型
    job.setReducerClass(PartitionReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    // 设置Reduce Task的个数
    job.setNumReduceTasks(2);
    // 指定输出类和输出路径
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.212.100:8020/output/partition_out"));
    // 3.等待任务结束
    boolean bl = job.waitForCompletion(true);
    return bl? 0: 1;
}

public static void main(String[] args) throws Exception {
    // 启动一个Job任务
    int run = ToolRunner.run(new Configuration(), new JobMain(), args);
    System.exit(run);
}

}



<a name="466a6157"></a>
## 3.6 MapReduce中的计数器

- 计数器是**收集作业统计信息的有效手段之一**,用于质量控制或应用级统计;
- 计数器还可**辅助诊断系统故障**。如果需要将日志信息传输到 map 或 reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生;
- 对于大型分布式作业而言,使用计数器更为方便。除了因为**获取计数器值比输出日志更方便**,因为根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

hadoop内置计数器由如下几类

| MapReduce任务计数器 | org.apache.hadoop.mapreduce.TaskCounter |
| --- | --- |
| 文件系统计数器 | org.apache.hadoop.mapreduce.FileSystemCounter |
| FileInputFormat计数器 | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
| FileOutputFormat计数器 | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
| 作业计数器 | org.apache.hadoop.mapreduce.JobCounter |


每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下图所示:

![](MapReduce.assets/image-20210408151621035.png#crop=0&crop=0&crop=1&crop=1&id=Xg6qA&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)

MapReduce当中有计数器的功能可以应对大部分场景,但是当遇到无法应对的场景时,就需要我们实现自己的计数器了,下面来看一个例子:

> 需求:以以上分区代码为案例,统计 map 接收到的数据记录条数


<a name="31a4965e"></a>
### 3.6.1 第一种方式

第一种方式定义计数器,通过context上下文对象可以获取我们的计数器,进行记录;通过context上下文对象,在map端使用计数器进行统计。

```java
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    //map方法将K1和V1转为K2和V2
    @Override
    protected void map(LongWritable key, Text value, Context context) throws Exception{
        Counter counter = context.getCounter("MR_COUNT","MyRecordCounter");
        counter.increment(1L);
        context.write(value,NullWritable.get());
    }
}

运行程序之后就可以看到我们自定义的计数器在map阶段读取了七条数据

3. MapReduce - 图4

3.6.2 第二种方式

通过enum枚举类型来定义计数器,统计reduce端数据的输入的key有多少个。

public class PartitionerReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    public static enum Counter{
        MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES
    }
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L);
        context.write(key, NullWritable.get());
    }
}

3. MapReduce - 图5

3.7 MapReduce排序和序列化

  • 序列化 (Serialization) 是指把结构化对象转化为字节流。反序列化 (Deserialization) 是序列化的逆过程,把字节流转为结构化对象
    • 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流;
    • 当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
  • 一个对象被序列化后,会附带很多额外的信息 (各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable),精简高效。不用像 Java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
  • Writable 是 Hadoop 的序列化格式,Hadoop 定义了这样一个 Writable 接口,一个类要支持可序列化只需实现这个接口即可
  • 另外 Writable 有一个子接口是 WritableComparable,WritableComparable 是既可实现序列化,也可以对key进行比较,我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能

数据格式如下:

3. MapReduce - 图6

要求:

  • 第一列按照字典顺序进行排列;
  • 第一列相同的时候,第二列按照升序进行排列。

解决思路:

  • 将 Map 端输出的 中的 key 和 value 组合成一个新的 key (newKey),value值不变;
  • 这里就变成 <(key, value), value>,在针对 newKey 排序的时候,如果 key 相同,就再对value进行排序

代码实现

  • 自定义类型和比较器 ```java package sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class SortBean implements WritableComparable { private int num; private String word;

public int getNum() {
    return num;
}

public void setNum(int num) {
    this.num = num;
}

public String getWord() {
    return word;
}

public void setWord(String word) {
    this.word = word;
}

// 重写toString方法
@Override
public String toString() {
    return  word + "\t" + num;
}

// 核心,实现比较器,自定义比较规则
// 先按字典比较第一个字母,要是相同的话,则比较数字
@Override
public int compareTo(SortBean sortBean) {
    int res = this.word.compareTo(sortBean.word);
    if(res == 0){
        return this.num - sortBean.num;
    }
    return res;
}

// 实现序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeUTF(word);
    dataOutput.writeInt(num);
}

// 实现反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
    this.word = dataInput.readUTF();
    this.num = dataInput.readInt();
}

}



-  Mapper 
```java
package sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text, SortBean, NullWritable> {
    /*
    Mapper将K1, V1转换为K2, V2
    其中,
    K1  LongWritable
    V1  Text
    K2  SortBean
    V2  NullWritable
    */

    // 重写map方法
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2
        String[] split = value.toString().split("\t");
        SortBean sortBean = new SortBean();
        sortBean.setWord(split[0]);
        sortBean.setNum(Integer.parseInt(split[1]));

        // 2.将K2和V2写入上下文中
        // 这里V由于包含在Key里,所以不用传了,直接传一个NullWriteable即可
        context.write(sortBean, NullWritable.get());
    }
  • Reducer ```java package sort;

import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer { / Reducer将新的K2,V2转换成K3,V3 /

@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    context.write(key, NullWritable.get());
}

}



-  Main 入口 
```java
package sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        // 1.创建job对象
        Job job = Job.getInstance(super.getConf(), "mapreduce_sort");

        // 2.配置job任务(八个步骤)
        // 第一步:设置输入类和输入路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.212.100:8020/input/sort_input"));;
        // 第二步: 设置Mapper类和数据类型
        job.setMapperClass(SortMapper.class);
        job.setMapOutputKeyClass(SortBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 第三、四、五、六步,注意这里排序在SortBean中完成,不用设置

        // 第七步:设置Reducer类和类型
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(SortBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 第八步:设置输出类和输出的路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\sort_output"));

        // 3.等待任务结束
        boolean bl = job.waitForCompletion(true);

        return bl? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        // 启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);

        System.exit(run);
    }
}

3.8 规约(Combiner)

3.8.1 概述

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并(先局部汇总,也可以理解为局部reduce),以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO性能,是 MapReduce 的一种优化手段之一;

  • combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件;
  • combiner 组件的父类就是 Reducer;
  • combiner 和 reducer 的区别在于运行的位置
    • Combiner 是在每一个 maptask 所在的节点运行,相当于对每个mapTask上的节点做一次预聚合;
    • Reducer 是接收全局所有 Mapper 的输出结果;
  • combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量

3.8.2 实现步骤

  1. 自定义一个 combiner 继承 Reducer,重写 reduce 方法
  2. 在 job 中设置 job.setCombinerClass(CustomCombiner.class)

使用 combiner 的前提是不能影响最终的业务逻辑,而且,combiner的输出k、v应该跟
reducer的输入k、v类型要对应起来

3.9 MapReduce案例——流量统计

3.9.1 需求1——统计求和

统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入

  • Step 1: 自定义map的输出value对象FlowBean ```java package flowcount;

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class FlowBean implements Writable {

private Integer upFlow; // 上行数据包数
private Integer downFlow; // 下行数据包数
private Integer upCountFlow; // 上行流量总和
private Integer downCountFlow; // 下行流量总和

public Integer getUpFlow() {
    return upFlow;
}

public void setUpFlow(Integer upFlow) {
    this.upFlow = upFlow;
}

public Integer getDownFlow() {
    return downFlow;
}

public void setDownFlow(Integer downFlow) {
    this.downFlow = downFlow;
}

public Integer getUpCountFlow() {
    return upCountFlow;
}

public void setUpCountFlow(Integer upCountFlow) {
    this.upCountFlow = upCountFlow;
}

public Integer getDownCountFlow() {
    return downCountFlow;
}

public void setDownCountFlow(Integer downCountFlow) {
    this.downCountFlow = downCountFlow;
}

@Override
public String toString() {
    return  upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" + downCountFlow;
}


// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {

    dataOutput.writeInt(upFlow);
    dataOutput.writeInt(downFlow);
    dataOutput.writeInt(upCountFlow);
    dataOutput.writeInt(downCountFlow);

}

// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
    this.upFlow = dataInput.readInt();
    this.downFlow = dataInput.readInt();
    this.upCountFlow = dataInput.readInt();
    this.downCountFlow = dataInput.readInt();
}

}



-  Step 2: **定义FlowMapper类** 
```java
package flowcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    /*
    将K1、V1转换为K2、V2
    其中:
    K1  文本偏移量 LongWritable
    V1  文本  Text
    K2  手机号 Text
    V2  四个字段    FlowBean
    */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.拆分行文本数据,得到手机号--->K2
        String[] split = value.toString().split("\t");
        String phoneNum = split[1];

        // 2.创建FlowBean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给FlowBean对象
        FlowBean flowBean = new FlowBean();
        flowBean.setUpFlow(Integer.parseInt(split[6]));
        flowBean.setDownFlow(Integer.parseInt(split[7]));
        flowBean.setUpCountFlow(Integer.parseInt(split[8]));
        flowBean.setDownCountFlow(Integer.parseInt(split[9]));

        // 3.将K2和V2写入上下文中
        context.write(new Text(phoneNum), flowBean);
    }
}
  • Step 3: 定义FlowReducer类 ```java package flowcount;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1.遍历集合,并将集合中的对应的四个字段累计 Integer upFlow = 0; // 上行流量包数 Integer downFlow = 0; // 下行流量包数 Integer upCountFlow = 0; // 上行流量总和 Integer downCountFlow = 0; // 下行流量总和

    for (FlowBean value : values) {
        upFlow += value.getUpFlow();
        downFlow += value.getDownFlow();
        upCountFlow += value.getUpCountFlow();
        downCountFlow += value.getDownCountFlow();
    }

    // 2.创建FlowBean对象,并给对象赋值 V3
    FlowBean flowBean = new FlowBean();
    flowBean.setUpFlow(upFlow);
    flowBean.setDownFlow(downFlow);
    flowBean.setUpCountFlow(upCountFlow);
    flowBean.setDownCountFlow(downCountFlow);

    // 3.将K3和V3写入上下文中
    context.write(key, flowBean);   // 这里key还是原手机号,不变
}

}



-  Step 4: 程序main函数入口FlowMain 
```java
package flowcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        // 1.创建一个job任务
        Job job = Job.getInstance(super.getConf(), "flowcount_mapreduce");
        // 打包运行的话要加这一行
        job.setJarByClass(JobMain.class);

        // 2.配置job任务对象
        // 第一步,指定文件的读取方式与路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\flowcount"));
        // 第二步,指定Map阶段的处理方式和数据类型
        job.setMapperClass(FlowCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 第三、四、五、六步
        // 第七步,指定Reduce阶段的处理方式和数据类型
        job.setReducerClass(FlowCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 第八步,设置输出类型与路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\flowcount_out"));
        // 3.等待任务结束
        boolean bl = job.waitForCompletion(true);
        return bl? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        // 启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);

        System.exit(run);
    }
}

3.9.2 需求2——上行流量倒序排序(递减排序)

以需求一的输出数据作为排序的输入数据,自定义FlowBean,以FlowBean为map输出的key,以手机号作为Map输出的value,因为MapReduce程序会对Map阶段输出的key进行排序

  • Step 1: 定义FlowBean实现WritableComparable实现比较排序
    • Java 的 compareTo 方法说明:compareTo 方法用于将当前对象与方法的参数进行比较。
      • 如果指定的数与参数相等返回 0。
      • 如果指定的数小于参数返回 -1。
      • 如果指定的数大于参数返回 1。 ```java import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class FlowBean implements WritableComparable {

private Integer upFlow; // 上行数据包数
private Integer downFlow; // 下行数据包数
private Integer upCountFlow; // 上行流量总和
private Integer downCountFlow; // 下行流量总和

public Integer getUpFlow() {
    return upFlow;
}

public void setUpFlow(Integer upFlow) {
    this.upFlow = upFlow;
}

public Integer getDownFlow() {
    return downFlow;
}

public void setDownFlow(Integer downFlow) {
    this.downFlow = downFlow;
}

public Integer getUpCountFlow() {
    return upCountFlow;
}

public void setUpCountFlow(Integer upCountFlow) {
    this.upCountFlow = upCountFlow;
}

public Integer getDownCountFlow() {
    return downCountFlow;
}

public void setDownCountFlow(Integer downCountFlow) {
    this.downCountFlow = downCountFlow;
}

@Override
public String toString() {
    return  upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" + downCountFlow;
}

// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {

    dataOutput.writeInt(upFlow);
    dataOutput.writeInt(downFlow);
    dataOutput.writeInt(upCountFlow);
    dataOutput.writeInt(downCountFlow);

}

// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
    this.upFlow = dataInput.readInt();
    this.downFlow = dataInput.readInt();
    this.upCountFlow = dataInput.readInt();
    this.downCountFlow = dataInput.readInt();
}

// 重写排序方法,指定逆序排序规则
@Override
public int compareTo(FlowBean flowBean) {
    // return -1 * (this.upFlow.compareTo(flowBean.compareTo)
    return -1 * (this.upFlow - flowBean.upFlow);
}

}


-  Step 2: 定义FlowMapper 
```java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    /*
    将K1、V1转换为K2、V2
    其中:
    K1  文本偏移量 LongWritable
    V1  文本  Text
    K2  四个字段    FlowBean
    V2  手机号  Text
    */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.拆分行文本数据
        String[] split = value.toString().split("\t");

        // 2.创建FlowBean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给FlowBean对象 -> K2
        FlowBean flowBean = new FlowBean();
        flowBean.setUpFlow(Integer.parseInt(split[1]));
        flowBean.setDownFlow(Integer.parseInt(split[2]));
        flowBean.setUpCountFlow(Integer.parseInt(split[3]));
        flowBean.setDownCountFlow(Integer.parseInt(split[4]));
        // 得到手机号 -> V2
        String phoneNum = split[0];

        // 3.将K2和V2写入上下文中
        context.write(flowBean, new Text(phoneNum));
    }
}
  • Step 3: 定义FlowReducer ```java import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountReducer extends Reducer { / 将新K2、V2转换为K3、V3 其中, K2 四个字段 FlowBean V2 手机号 Text K3 手机号 Text V3 四个字段 FlowBean /

@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    // 遍历集合,取出K3,将K3、V3写入上下文
    for (Text value : values) {
        context.write(value, key);
    }
}

}



-  Step 4: 程序main函数入口 
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        // 1.创建一个job任务
        Job job = Job.getInstance(super.getConf(), "flowcount_mapreduce");
        // 打包运行的话要加这一行
        job.setJarByClass(JobMain.class);

        // 2.配置job任务对象
        // 第一步,指定文件的读取方式与路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("file:///D:\\output\\flowcount_out"));
        // 第二步,指定Map阶段的处理方式和数据类型
        job.setMapperClass(FlowCountMapper.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 第三、四、五、六步
        // 第七步,指定Reduce阶段的处理方式和数据类型
        job.setReducerClass(FlowCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 第八步,设置输出类型与路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\flowsortcount_out"));

        // 3.等待任务结束
        boolean bl = job.waitForCompletion(true);
        return bl? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        // 启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);

        System.exit(run);
    }
}

3.9.3 需求3——手机号码分区

在需求一的基础上,继续完善,将不同的手机号分到不同的数据文件的当中去,需要自定义
分区来实现,这里我们自定义来模拟分区,将以下数字开头的手机号进行分开

135 开头数据到一个分区文件
136 开头数据到一个分区文件
137 开头数据到一个分区文件
其他分区

3.9.3.1 自定义分区

package flowcount;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowCountPartition extends Partitioner<Text, FlowBean> {
    /*
    该方法用于定义一个完整的分区规则:
        135 开头数据到一个分区文件
        136 开头数据到一个分区文件
        137 开头数据到一个分区文件
            其他分区

    参数:
        text : K2 手机号
        flowBean: V2
        i : ReduceTask的个数
    */

    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // 1.获取手机号
        String phoneNum = text.toString();
        // 2.判断手机号以什么开头,返回对应的分区编号(0-3)
        if(phoneNum.startsWith("135")){
            return 0;
        }
        else if(phoneNum.startsWith("136")){
            return 1;
        }
        else if(phoneNum.startsWith("137")){
            return 2;
        }
        else{
            return 3;
        }
    }
}

然后在需求1的JobMain中增加下两行代码

job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(4);

3.10 MapReduce的运行机制(面试重点考点)

MapReduce整体的工作流程如下图所示:

3. MapReduce - 图7

其主要分为MapTask、Shuffle和ReduceTask三个阶段,下面来分别看看每个阶段的工作机制

3.10.1 MapTask 的工作机制

整个Map阶段的工作流程如下图所示:

3. MapReduce - 图8

具体的执行流程如下:

  • 读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 方法对输入目录中文件进行逻辑切片规划得到 block有多少个 block 就对应启动多少个 MapTask
  • 输入文件切分为 block 之后,由 RecordReader 对象 (默认是LineRecordReader) 进行读取,以 \n 作为分隔符,读取一行数据,返回 。Key 表示每行首字符偏移值,Value 表示这一行文本内容
  • 然后,将返回的 送入用户自己定义的 Mapper 类中,执行用户重写的 map 函数,RecordReader 读取一行这里调用一次

3.10.2 Shuffle 的工作机制

首先来看看Map的Suffle阶段:

  • 分区阶段
    Mapper 逻辑结束之后,将 Mapper 的每条结果通过 context.write 进行数据收集。在数据收集中,会先对其进行分区处理,默认使用 HashPartitioner
    • MapReduce 提供 Partitioner 接口,它的作用就是根据 Key 或 Value 及 Reducer 的数量来决定当前的这对输出数据最终应该交由哪个 Reducetask 处理,默认对 Key Hash 后再以 Reducer 数量取模默认的取模方式只是为了平均 Reducer 的处理能力,如果用户自己对 Partitioner 有需求,可以订制并设置到 Job 上。
  • 数据收集阶段与溢写阶段:
    接下来会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集Mapper 结果,减少磁盘 IO 的影响。我们的 对以及 Partition 的结果都会被写入缓冲区。当然,写入之前,Key 与 Value 值都会被序列化成字节数组
    • 环形缓冲区中的环形其实是一个抽象的结构,它本质是一个数组,存放着的序列化数据和元数据信息,包括Partition,Key的起始位置,Value的起始位置和Value的长度;
    • 缓冲区有大小限制,默认为100M。当Mapper 的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘中写数据的过程称为溢写,它有以下特点:
      • 由单独线程来完成,不影响往缓冲区写 Mapper 结果的线程;
      • 溢写线程启动时不应该阻止 Mapper 的结果输出,所以整个缓冲区有个溢写的比例,默认为0.8,也就是当缓冲区的数据已经达到阈值的80%时,溢写程序启动,锁定这些内存,并执行溢写过程。Mapper的输出结果则写入剩下的20%内存中,互不影响;
  • 排序和组合阶段:
    (是否排序是MR的Shuffle阶段和Spark的Shuffle阶段最大的区别)
    当溢写线程启动后,需要对这80%的空间内的 Key 做排序 (用的是快速排序)。排序是 MapReduce 模型默认的行为,这里的排序也是对序列化的字节做的排序;
    • 如果 Job 设置过 Combiner,就可以在这里使用。将有相同 Key 的 对的 Value 加起来,减少溢写到磁盘的数据量。Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用;
    • 但是,并不是所有的场景都可以使用Combiner。Combiner 的输出是 Reducer 的输入,Combiner 绝不能改变最终的计算结果。Combiner 只适合用于那种 Reduce 的输入 与输出 类型完全一致,且不影响最终结果的场景,比如累加,、最大值等。Combiner 的使用一定得慎重,如果用好,它对 Job 执行效率有帮助,反之会影响 Reducer 的最终结果。
  • 合并溢写文件阶段
    每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner),如果 Mapper 的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

从上面可以看出,Shuffle阶段主要对Map阶段传来数据进行分区,排序,规约和合并,具体如下图所示:

3. MapReduce - 图9

与Shuffle阶段有关的配置信息如下图所示:

3. MapReduce - 图10

3. MapReduce - 图11

然后是Reduce阶段的Shuffle,每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。

3.10.3 ReduceTask 的工作机制

ReduceTask的工作机制如下图所示:

3. MapReduce - 图12

其大致分为 copy、merge、reduce 三个阶段,重点在前两个阶段:

  • Copy阶段:Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件;
  • Merge阶段:这里的merge和map端的shuffle阶段的merge动作类似,只是数组中存放的是不同map端copy来的数值。copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map端类似,这也是溢写的过程,这个过程中如果设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件;
  • 合并排序:把分散的数据合并成一个大的数据后,还会再对合并后的数据排序,这里用的是归并排序。
  • 最后,对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,然后把这些输出的键值对写入到HDFS文件中就完成了整个M-R阶段。

3.11 Join(面试考点)

3.11.1 在Reduce端进行Join

加入有两个数据量巨大的表的数据是以文件的形式存在HDFS中的,我们现在需要MapReduce程序实现以下 SQL计算:

select a.id,a.date,b.name,b.category_id,b.price from t_order a left
join t_product b on a.pid = b.id

两个表的描述如下:

  • 商品表 3. MapReduce - 图13
  • 订单数据表 3. MapReduce - 图14

我们需要通过将关联的条件作为map输出的key将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。具体步骤如下:

  • 定义Mapper ```java package cn.itcast.reduce_join;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper { / 将K1、V1转换为K2、V2,其中 K1 文本偏移量 LongWritable V1 字段 Text K2 商品订单号 Text V2 字段 Text /

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1.判断数据来自哪个文件
    FileSplit fileSplit = (FileSplit)context.getInputSplit();
    String fileName = fileSplit.getPath().getName();
    if(fileName.equals("product.txt")){
        // 数据来自商品表
        // 2.将K1和V1转为K2和V2,写入上下文中
        String[] split = value.toString().split(",");
        String productId = split[0];
        context.write(new Text(productId), value);
    }
    else{
        // 商品来自订单表
        // 2.将K1和V1转为K2和V2,写入上下文中
        String[] split = value.toString().split(",");
        String productId = split[2];
        context.write(new Text(productId), value);
    }
}

}



-  定义Reducer 
```java
package cn.itcast.reduce_join;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReducer extends Reducer<Text, Text, Text, Text> {
    /*
        将新K2、V2转为K3、V3
        其中,
        K2 商品编号 Text
        V2  字段  Text
        K3 商品编号 Text
        V3 字段  Text
    */

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 1.遍历集合,获取V3(first + second)
        String first = "";
        String second = "";
        for (Text value : values) {
            if(value.toString().startsWith("p")){
                first = value.toString();
            }
            else{
                if(second == ""){
                    second += value.toString();
                }
                else{
                    second += ("; " + value.toString());
                }
            }
        }
        // 2.将K3,V3写入上下文
        context.write(key, new Text(first + "\t" + second));
    }
}
  • 定义主类 ```java package cn.itcast.reduce_join;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { // 1.获取job对象 Job job = Job.getInstance(super.getConf(), “reduce_join”); // 2.设置job任务 // 第一步,设置输入类和输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(“file:///D:\input\reduce_join_input”)); // 第二步,设置Mapper类和输出类型 job.setMapperClass(ReduceJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 第三、四、五、六步 shuffle 默认 // 第七步,设置Reducer类和输出类型 job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 第八步,设置输出类与输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(“file:///D:\output\reduce_join_out”)); // 3.等待任务结束 boolean bl = job.waitForCompletion(true); return bl? 0: 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration();

    // 启动一个job任务
    int run = ToolRunner.run(configuration, new JobMain(), args);

    System.exit(run);
}

}



<a name="17671e36"></a>
### 3.11.2 在Map端进行Join

由于Map到Reduce端回经过网络传输,而且**Map以后的数据在Reduce端可能产生倾斜**(即一台主机处理过多的数据),所以不如直接在Map端进行Join。具体的代码如下:

-  自定义Mapper 
```java
package org.example.join.map_join;

/*
    将 K1、V1 转为 K2、V2
    其中,
    K1: 文本偏移量 LongWritable
    V1: 字段  Text
    K2: 商品编号    Text
    V2: 字段  Text

 */

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    // 将分布式缓存的小表读取到本地Map集合

    // 定义一个用于存储小表数据的map
    HashMap<String, String> map = new HashMap<String, String>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 1.读取分布式缓存文件列表
        URI[] cacheFiles = context.getCacheFiles();

        // 2.获取指定的分布式文件系统
        FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());

        // 3.获取文件的输入流
        FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));

        // 4.读取文件内容
        // 4.1 将字节输入流转为字符缓冲流,即FSDataInputStream -> BufferedReader
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader((inputStream)));
        // 4.2 读取小表文件内容,以行为单位,并将读取的数据存入map集合
        String line = null;
        while((line= bufferedReader.readLine()) != null) {
            String[] split = line.split(",");
            map.put(split[0], line);
        }

        // 关闭流
        bufferedReader.close();
        fileSystem.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 从行文本数据中获取商品的id:p0001,p0002,得到K2
        String[] split = value.toString().split(",");
        String productId = split[2];

        // 2. 在map集合中,将商品的id作为键,获取值(商品的行文本数据),将VALUE和值拼接得到V2
        String productLine = map.get(productId);
        String valueLine = productLine + ";" + value.toString();

        // 3. K2、V2写入上下文
        context.write(new Text(productId), new Text(valueLine));
    }
}
  • 主类: ```java package cn.itcast.map_join;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { // 1.获取job对象 Job job = Job.getInstance(super.getConf(), “map_join”);

    // 2.设置job对象(小表放入分布式缓存中)
    // 先将小表放入分布式缓存中,然后开始那八步(这步重点)
    job.addCacheFile(new URI("hdfs://192.168.212.100:8020/cache_file/product.txt"));
    // 第一步,设置输入类和输入的路径(这里设置的式大表)
    job.setInputFormatClass(TextInputFormat.class);
    TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\map_join_input"));
    // 第二步,设置Mapper类和数据类型
    job.setMapperClass(MapJoinMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    // 第三、四、五、六步;第七步
    // 第八步,设置输出类和输出路径
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\map_join_out"));

    // 3. 等待任务结束
    boolean bl = job.waitForCompletion(true);
    return bl? 0: 1;
}

public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    // 启动job任务
    int run = ToolRunner.run(configuration, new JobMain(), args);
    System.exit(run);
}

}



<a name="96b8ae1e"></a>
## 3.12 案例——求共同好友

> 需求:以下是某社交软件的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的),如下图所示,现在需要求出哪些人**两两之间**有共同好友,且他们的共同好友都有谁?
>

A:B,C,D,F,E,O B:A,C,E,K C:A,B,D,E,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J


首先对需求做一下流程分析,该过程需要**两个MapReduce过程实现**。以寻找A和E的所有共同好友为例,其最终输出为:

A-E C,E


这个结果由结果`A和E的一个共同好友`

A-B C A-B E


合并得到。以这个结构反推,就需要知道哪些人的好友是C或E,这里以C为例,有:

A-B-E-F-G-H-K C


以上过程就是第二个MapReduce阶段的结果的逆推。那么,如何用这个结构反推到原始文件呢?首先可以看到,这个数据的 Key 太复杂了,不妨把它反转下,得到:

C A-B-E-F-G-H-K


现在只需要按 Value 拆分,即求C是谁的好友:

C A C B C E C F C G C H C K ```

这就只需要将原始文件中的朋友信息作为 Key, 当前的用户作为Value拆分就可以得到,这就是第一过程的MapReduce的逆推

下面来看下实现步骤:

  • 第一阶段:
  • 第二阶段:

3.13 案例——自定义InputFormat合并小文件

需求:无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。

目前小文件的优化主要有以下几种:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS;
  2. 业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并;
  3. 在mapreduce处理时,可采用combineInputFormat提高效率。

本案例主要是实现上述过程的第二种方法,具体如下:

  • 自定义一个InputFormat;
  • 改写RecordReader,实现一次读取一个完整文件封装为**<K, V>**
  • 在输出时使用SequenceFileOutPutFormat输出合并文件(SequenceFileOutPutForma生成的是一个二进制文件,生成的过程中做了压缩)

如下图所示:

3. MapReduce - 图15

代码的实现如下:

  • Step 1:自定义InputFormat
  • Step 2:自定义RecordReader
  • Step 3:Mapper类
  • Step4:主类

3.14 案例——自定义outputFormat

3.15 案例——自定义分组求TopN