一、MapReduce

1.定义

MapReduce 是分布式运算程序的编程框架,将用户编写的业务逻辑代码和自带默认组件合并成一个完整的分布式运算程序,进而并发的运行在一个hadoop 集群上。

2. 优缺点

  • 优点
    • 易于编程

用户只需要简单实现一些借口,就可以完成一个分布式程序,并且这个分布式程序可以运行在大量廉价的机器上。这些接口实现的逻辑,跟平时编写串行程序一模一样

  • 良好的扩展性

当计算资源达不到咱们想要的效果时,可以通过简单的增加机器数量来拓展他的计算能力

  • 高容错行

    1. 前面提到它可以运行在大量廉价的机器上,那么必然要求自身拥有很高的容错性。例如其中的一台节点挂了,它可以自动的将该节点的计算任务分配到其他节点,保证整个计算任务正常运行。
  • 缺点

    • 不擅长实时计算

      1. 它并不能向 mysql 一样在秒级或者毫秒级返回结果,往往需要很长的时间才能计算出结果,因此不适合实时计算。
    • 不擅长流式计算

      1. 流式计算的数据是动态的,而 MapReduce 在设计的时候要求数据源必须是静态的,不能动态变化
    • 不擅长 DAG(有向无环图) 的计算

      1. 如果多个计算任务存在依赖关系,及后一个的运行需要使用前一个的运行结果,在这种情况下使用 MapReduce 性能会非常低,只要是因为 MapReduce 每次运行的结果都会书写到磁盘上,从而造成大量的磁盘 IO ,导致最终效率很低

      3. 核心思想

      image.png

      4. MapReduce 进程

      一个完整的 MapReduce 程序在分布式运行时有三类实例进程。

    • MrAppMaster : 负责整个程序过程的调度及状态协调

    • MapTast : 负责Map 阶段的整个数据处理流程
    • ReduceTask : 负责Reduce阶段的整个数据处理流程

      5. 常用的序列化类型

      | Java类型 | Hadoop Writable类型 | | —- | —- | | Boolean | BooleanWritable | | Byte | ByteWritable | | Int | IntWritable | | Float | FloatWritable | | Long | LongWritable | | Double | DoubleWritable | | String | Text | | Map | MapWritable | | Array | ArrayWritable | | Null | NullWritable |

6. MapReduce 编程

用户编写 MapReduce 程序分为三个部分 : mapper、reducer、 driver
mapper:

  • 用户自定义的 mapper 需要继承 自己的父类
  • mapper 数据的数据是 KV 键值对类型的
  • mapper 中业务逻辑书写在 map 方法中,对每个 调用一次
  • mapper 输出的数据类型也是 KV 键值对

reducer:

  • 用户自定义的 reducer 需要继承自己的父类
  • reducer 输入的数据 是 类型的,即是 mapper 输出的数据
  • reducer 的业务逻辑书写在 reducer 中,对于每一组相同的 key 调用一次

driver:
相当于 yarn 客户端,用于提交我们的整个程序到 yarn 集群,提交的是封装了 MapReduce 程序相关运行参数的 job 对象。

7. wordcount 案例

1.代码

  1. 需求 : 统计以下文本中每个单词出现的次数(以空格隔开)

    1. allen hadoop
    2. nihao hadoop
    3. nihao spark
    4. hive hbase
    1. allen 1
    2. hadoop 2
    3. nihao 2
    4. spark 1
    5. hive 1
    6. hbase 1
  2. 按照编写流程 将 MapReduce 分为三个阶段 mapper、 reducer 、 driver

mapper: 采用按行读取数据,将每一行数据的数据以空格进行切分,将切分后的数据保存在map中,其中数据作为key,value为1,表示该单词出现了一次,最后将map进行输出
reducer: 对map 输出数据的 key 进行汇总,将总数进行输出
driver: 获取配置信息,管理mapper 和reducer 等

  1. 环境准备

创建maven 工程 导入如下依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-client</artifactId>
  5. <version>3.1.3</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>4.12</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.slf4j</groupId>
  14. <artifactId>slf4j-log4j12</artifactId>
  15. <version>1.7.30</version>
  16. </dependency>
  17. </dependencies>
  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  1. 编写程序 ```xml package com.hadoop.hdfs.wordcount;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class mapper extends Mapper {

  1. IntWritable v = new IntWritable(1);
  2. Text mapkey = new Text();
  3. @Override
  4. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  5. String[] strings = value.toString().split(" ");
  6. for (String s:strings
  7. ) {
  8. mapkey.set(s);
  9. context.write(mapkey,v);
  10. }
  11. }

}

  1. ```java
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class reducer extends Reducer<Text, IntWritable,Text,IntWritable> {
  7. int sum = 0;
  8. IntWritable mapSum = new IntWritable();
  9. @Override
  10. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  11. sum = 0;
  12. for (IntWritable value:values
  13. ) {
  14. sum = sum+ value.get();
  15. }
  16. mapSum.set(sum);
  17. context.write(key,mapSum);
  18. }
  19. }
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class driver {
  10. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  11. // 1 获取配置信息以及获取job对象
  12. Configuration conf = new Configuration();
  13. Job job = Job.getInstance(conf);
  14. // 2 关联本Driver程序的jar
  15. job.setJarByClass(driver.class);
  16. // 3 关联Mapper和Reducer的jar
  17. job.setMapperClass(mapper.class);
  18. job.setReducerClass(reducer.class);
  19. // 4 设置Mapper输出的kv类型
  20. job.setMapOutputKeyClass(Text.class);
  21. job.setMapOutputValueClass(IntWritable.class);
  22. // 5 设置最终输出kv类型
  23. job.setOutputKeyClass(Text.class);
  24. job.setOutputValueClass(IntWritable.class);
  25. // 6 设置输入和输出路径
  26. // FileInputFormat.setInputPaths(job, new Path(args[0]));
  27. // FileOutputFormat.setOutputPath(job, new Path(args[1]));
  28. FileInputFormat.setInputPaths(job, new Path("input"));
  29. FileOutputFormat.setOutputPath(job, new Path("output/output1"));
  30. // 7 提交job
  31. boolean result = job.waitForCompletion(true);
  32. System.exit(result ? 0 : 1);
  33. }
  34. }

2. 本地测试

  • 需要首先配置好HADOOP_HOME变量以及Windows运行依赖
  • 在IDEA/Eclipse上 点击driver main 运行程序

    3. 集群测试

  • 添加 jar 包依赖

    <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    </build>
    
  • 修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.1.3/demo路径。

  • 启动hadoop 集群
    • sbin/start-dfs.sh sbin/start-yarn.sh
    • 或者 myhadoop start
  • 上传文件
    • 提前在hadoop 上创建输入的文件夹 并上传文件
  • 运行

hadoop jar wc.jar
com.atguigu.mapreduce.wordcount.WordCountDriver /user/allen/input /user/allen/output

二、hadoop 序列化

1. 序列化概述

  • 什么是序列化

序列化就是将内存中的对象转化为字节序列存储到磁盘上,或者是转化为其他数据传输协议,便于在网络中传输
反序列化,是将磁盘中的文件或者收到的字节序列,在或是其他数据协议转化为内存中的对象。

  • 为什么要序列化

存储在内存中的对象,如果关机断电,对象就没有了,可以通过序列化将对象存储在磁盘中,保障对象不丢失。另外保存在内存中的对象不能发送到另一台计算机中,可以通过序列化,将对象传输到另一台计算机上,再通过反序列化加载到内存中,实现对象的传输

  • 为什么不用java的序列化

java的序列化是一个重量级的序列化框架(Serializable),当一个对象被序列化的时候,会附加好多其他信息,比如各种校验信息、header、继承体等,不便于在网络中传输

  • hadoop 序列化 特点

紧凑: 高效的使用存储空间
快速;数据读写额外开销少
互操作:支持多种语言的交互

2. 如何自定义对象序列化接口

  1. 实现 Writable 接口
  2. 必须有空参构造函数,因为反序列化的时候,需要反射调用空参构造
  3. 重写序列化方法
  4. 重写反序列化方法 (顺序和序列化保持一直)
  5. 如果想要把结果显示在文件中,需要重写tostring 方法
  6. 如果需要将自定义的bean 放在 key 中传输,还需要实现 comparable 接口,用于 shuffle 排序

    3. 案例

  7. 需求分析

统计文件中每一个手机号的 总上行流量、总下行流量、总流量

1     13736230513    192.196.100.1     www.atguigu.com     2481     24681     200
2     13846544121    192.196.100.2            264     0     200
3     13956435636    192.196.100.3            132     1512     200
4     13966251146    192.168.100.1            240     0    404
5     18271575951    192.168.100.2    www.atguigu.com    1527    2106    200
6     84188413    192.168.100.3    www.atguigu.com    4116    1432    200
7     13590439668    192.168.100.4            1116    954    200
8     15910133277    192.168.100.5    www.hao123.com    3156    2936    200
9     13729199489    192.168.100.6            240    0    200
10     13630577991    192.168.100.7    www.shouhu.com    6960    690    200
11     15043685818    192.168.100.8    www.baidu.com    3659    3538    200
12     15959002129    192.168.100.9    www.atguigu.com    1938    180    500
13     13560439638    192.168.100.10            918    4938    200
14     13470253144    192.168.100.11            180    180    200
15     13682846555    192.168.100.12    www.qq.com    1938    2910    200
16     13992314666    192.168.100.13    www.gaga.com    3008    3720    200
17     13509468723    192.168.100.14    www.qinghua.com    7335    110349    404
18     18390173782    192.168.100.15    www.sogou.com    9531    2412    200
19     13975057813    192.168.100.16    www.baidu.com    11058    48243    200
20     13768778790    192.168.100.17            120    120    200
21  13568436656    192.168.100.18    www.alibaba.com    2481    24681    200
22     13568436656    192.168.100.19            1116    954    200
# 输入格式
7     13560436666    120.196.100.99        1116         954            200
id    手机号码        网络ip            上行流量  下行流量     网络状态码

# 期待输出
13560436666         1116              954             2070
手机号码            上行流量        下行流量        总流量
  1. 代码 ```xml

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class PhoneBean implements Writable { private Long up; private Long down; private Long sum;

//  提供无参构造
public PhoneBean() {
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(up);
    dataOutput.writeLong(down);
    dataOutput.writeLong(sum);
}
// 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void readFields(DataInput dataInput) throws IOException {
    this.up = dataInput.readLong();
    this.down = dataInput.readLong();
    this.sum = dataInput.readLong();
}
// 提供三个参数的getter和setter方法
public Long getUp() {
    return up;
}

public void setUp(Long up) {
    this.up = up;
}

public Long getDown() {
    return down;
}

public void setDown(Long down) {
    this.down = down;
}

public Long getSum() {
    return sum;
}

public void setSum(Long sum) {
    this.sum = sum;
}
// 重写ToString
@Override
public String toString() {
    return up + "\t" + down + "\t" + sum;
}

}

```xml

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PhoneMapper extends Mapper<LongWritable, Text,Text,PhoneBean> {
    private PhoneBean phoneData = new PhoneBean();
    private Text mapkey = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split("\t");

        String phone = strings[1];
        String up = strings[strings.length - 3];
        String down = strings[strings.length - 2];

        phoneData.setUp(Long.parseLong(up));
        phoneData.setDown(Long.parseLong(down));
        phoneData.setSum(0l);

        mapkey.set(phone);
        context.write(mapkey,phoneData);
    }
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PhoneReducer extends Reducer<Text,PhoneBean,Text,PhoneBean> {

    private PhoneBean phoneData = new PhoneBean();
    @Override
    protected void reduce(Text key, Iterable<PhoneBean> values, Context context) throws IOException, InterruptedException {
        Long upSum = 0l;
        Long downSum = 0l;
        for (PhoneBean value:values
             ) {
            upSum += value.getUp();
            downSum += value.getDown();
        }
        phoneData.setUp(upSum);
        phoneData.setDown(downSum);
        phoneData.setSum(upSum + downSum);
        context.write(key,phoneData);
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PhoneDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(PhoneDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(PhoneMapper.class);
        job.setReducerClass(PhoneReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PhoneBean.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PhoneBean.class);

        // 6 设置输入和输出路径
//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output/output2"));


        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

三、MapReduce 框架原理

image.png

1. InputForMat 数据输入

1. 切片与MapTask 并行决定机制

  • MapReduce 的并行度 决定了 Map 阶段的任务处理并发度,进而影响到整个 job 的处理速度。
  • MapTask 并行度决定机制
    • 数据块: Block 是 HDFS 物理上把数据分为一块一块的。数据块是 HDFS存储数据的单位
    • 数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成块。数据切片是 MapReduce程序计算输入数据的单位,一个切片对应启动一个 MapTask。

image.png

2. Job 提交源码

waitForCompletion()

submit();

// 1建立连接
    connect();    
        // 1)创建提交Job的代理
        new Cluster(getConfiguration());
            // (1)判断是本地运行环境还是yarn集群运行环境
            initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

    // 1)创建给集群提交数据的Stag路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    // 2)获取jobid ,并创建Job路径
    JobID jobId = submitClient.getNewJobID();

    // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);

    // 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

    // 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);

    // 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

image.png

3. FileInputFormat 切片源码解析

切片不考虑数据整体,逐一对每一个文件简单的依靠数据内容长度进行切分。具体切分过程如下

  1. 程序找到数据文件目录
  1. 开始遍历处理 目录下的每一个文件
  1. 遍历第一个文件 aa.txt
     1. 获取文件大小
     1. 计算切片大小(默认是块大小 128M)

               computeSplitSize(Math.max(minSize,Math.max(maxSize,blocksize)) 

     3. 开始切片 

         第一个切片  0-128M<br /> 第二个切片  128-256M<br /> ·······<br />每次切片时。判断剩下的切片大小是否 大于等于切块大小的 1.1倍,不大于1.1倍 就一起切了

     4. 将切片信息保存在一个切片规划文件中。记录的信息是 切片起始位置、切片长度 、所在节点信息等。
  4. 提交切片信息规划文件 到 yarn 中,yarn 的 MrAppMaster 根据切片规划文件计算开启的 MapTesk 个数
  1. 案例

image.png

  1. 切分大小的参数

    1. 源码中

    Math.max(minSize, Math.min(maxSize, blockSize));
    minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    getFormatMinSplitSize() return 1;
    getMinSplitSize(job) return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    maxSize = getMaxSplitSize(job);return context.getConfiguration().getLong(SPLIT_MAXSIZE,
    Long.MAX_VALUE);
    blockSize = file.getBlockSize();return this.blocksize;

    1. 切片大小设置

maxSize :如果比 blockSize 小,这会让切片数比 blockSize少,
minSize : 如果比 blockSize 大,可以让切片数比 blockSize 大

4. TextInputFormat

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等,不算自定义的共有6个。
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
如:

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)

5. CombineTextInputFormat切片机制

  1. 问题引入

    框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理

  2. 设置虚拟切偏最大值

    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

  3. 切片机制

    生成切片的过程包含两个:虚拟存储过程和切分过程
    image.png

  • 虚拟存储过程

获取数据数据目录下面的所有文件,依次与设置的 setMaxInputSplitSize 值比较,如果小于 设置的最大值,设置为一个逻辑分片;如果大于设置的最大值且小于最大值的二倍,将文件均匀划分为两个虚拟存储块;如果大于最大值的2倍,按照最大值的二倍进行切割,切割后再次进行比较。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

  • 切分过程

判断虚拟存储文件的大小是否大于 setMaxInputSplitSize ,如果大于则单独形成一个切片;如果不大于则和下一个虚拟存储文件合并,共同形成一个切片
测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

  1. 案例实现
    • 在驱动类中添加
      ```java // 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

<a name="FkwZq"></a>
### 2. MapReduce 工作流程
![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647614094638-496c74b6-ed3f-44b7-a3d0-f10ea2bcf0df.png#clientId=ud28f1f69-07ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=763&id=u6cd91697&margin=%5Bobject%20Object%5D&name=image.png&originHeight=954&originWidth=1916&originalType=binary&ratio=1&rotation=0&showTitle=false&size=243716&status=done&style=none&taskId=ue922f2aa-09f2-463f-b7f2-b408f77d35e&title=&width=1532.8)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647614188054-b38b5853-6438-4174-8df7-ca695692a20a.png#clientId=ud28f1f69-07ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=751&id=u526b5d73&margin=%5Bobject%20Object%5D&name=image.png&originHeight=939&originWidth=1897&originalType=binary&ratio=1&rotation=0&showTitle=false&size=177288&status=done&style=none&taskId=u05758a10-1192-45eb-ada8-a6c760cc993&title=&width=1517.6)<br />上面两个图是 MapReduce 最全的工作流程图,但是 shuffle 是从 7-16步,过程如下:

1. MapTask 收集 map 方法的 kv 键值对输出,放到内存缓冲区中
1. 当缓冲区使用到80%的时候将缓冲区内容书写到磁盘上,同时 缓冲区开始反向写。此过程可能产生多次,想磁盘中溢写多个文件
1. 在溢出书写到磁盘文件之前需要排序,使用索引排序,排完序后书写到磁盘中,排序使用的是快排
1. 当溢写完成后,使用归并排序 将多次溢写的文件合并到一起。
1. ReduceTask 会根据自己的分区号去 各个 MapTask 机器上取相应的结果分区数据
1. 取完后,对来自不同的 MapTask 的数据再次进行归并排序
1. 合并成大文件后 Shuffle的过程也就结束了。后面进入 ReduceTask 的逻辑运算过程,大致是从文件中取出一个个键值对 Group,调用用户自己的 Reduce 方法

注意: Shuffle 缓冲区的大小 会影响 MapReduce 的执行效率。理论上来讲,Shuffle 越大,IO次数越少,执行速度越快。缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。
<a name="IsOR7"></a>
### 3. Shuffle 机制
Map 方法之后 Reduce 方法之前的数据处理过程 统称为 Shuffle
<a name="GebEF"></a>
### ![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647615737153-876b0be3-ea4c-44eb-97f6-a75a95c31606.png#clientId=ud28f1f69-07ad-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=769&id=u73b304d6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=961&originWidth=1903&originalType=binary&ratio=1&rotation=0&showTitle=false&size=229247&status=done&style=none&taskId=u50b803a9-2f17-4eb3-8091-b40973da3eb&title=&width=1522.4)
<a name="M1FL5"></a>
#### 1. 分区
默认的分区是根据 key 的 hashcode 对 ReduceTasks 的个数取模得到的,用户没有办法控制 key 存储到哪个分区
<a name="vTZuc"></a>
#### 2. 自定义分区

- 步骤 
   - 自定义类继承 partitioner 重写 gitPartition 方法
   - 在Job驱动中 设置自定义分区。

              job.setPartitionerClass(xxxPartitioner.class)

   - 根据自定义 partitioner 的逻辑设置相应数量的 ReduceTask

             job.setNumReduceTasks(5)
<a name="HLpDN"></a>
#### 4. 分区总结

- 如果输入的 ReduceTask数量 > 实际的逻辑分区数,会产生几个空输出文件 part-r-000x
- 如果 1 < ReduceTask数量 < 实际的逻辑分区数,会导致部分数据无法存放,报错
- 如果 设计的 ReduceTask数量  =1 ,则无论 MapTask 输出多少个分区文件,最后结果都交给一个 ReduceTask ,产生一个结果文件
- 分区号 必须从0开始 累加
- 说明 (假设逻辑分区数是5)
   - job.setNumReduceTasks(1)  正常运行 但是只会产生一个输出文件
   - job.setNumReduceTasks(3) 报错
   - job.setNumReduceTasks(6)  程序正常运行 不过会产生一个空文件。
<a name="CFXiV"></a>
#### 5.案例

- 输入数据

[phone_data.txt](https://www.yuque.com/attachments/yuque/0/2022/txt/23124036/1647660896565-bebb6780-2e12-4b85-812d-8a7c2b4b79a1.txt?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Ftxt%2F23124036%2F1647660896565-bebb6780-2e12-4b85-812d-8a7c2b4b79a1.txt%22%2C%22name%22%3A%22phone_data.txt%22%2C%22size%22%3A1178%2C%22type%22%3A%22text%2Fplain%22%2C%22ext%22%3A%22txt%22%2C%22status%22%3A%22done%22%2C%22taskId%22%3A%22ud171e1cd-503e-495d-947c-0e4c872ffc9%22%2C%22taskType%22%3A%22upload%22%2C%22id%22%3A%22u33e6b2c2%22%2C%22card%22%3A%22file%22%7D)

- 期望输出

手机号136 137 138 139 开头的放在一个独立的文件中,其他的放在另一个文件中

- 代码
```java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class selfPartition extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        String s = text.toString();
        String title = s.substring(0,3);
        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;

        switch (title){
            case "136": partition = 0;break;
            case "137": partition = 1;break;
            case "138": partition = 2;break;
            case "139": partition = 3;break;
            default:partition = 4;
        }
        return partition;
    }
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class partitionMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text mapkey = new Text();
    private IntWritable mapValue = new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split("\t");

        String phone = strings[1];
        String up = strings[strings.length - 3];
        String down = strings[strings.length - 2];

        Integer values  = Integer.parseInt(up) + Integer.parseInt(down);

        mapkey.set(phone);
        mapValue.set(values);
        context.write(mapkey,mapValue);
    }
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class partitonReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    IntWritable mapValue = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Integer downSum = 0;
        for (IntWritable value:values
        ) {
            downSum = downSum + value.get();
        }
        mapValue.set(downSum);
        context.write(key,mapValue);
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class partitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(partitionDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(partitionMapper.class);
        job.setReducerClass(partitonReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //8 指定自定义分区器
        job.setPartitionerClass(selfPartition.class);

        //9 同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);


        // 6 设置输入和输出路径
//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output/output4"));


        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

6. WritableComparable 排序

排序是 MapReduce 最重要的操作之一。无论是 MapTask 阶段还是 ReduceTask 阶段都会按照 Key 进行排序,这是 hadoop 的默认行为,无论逻辑上是否需要,都会进行。默认排序是按照字典顺序进行排序,使用的方法是快排。

  • 对于MapTask ,他会将处理的数据放入环形缓冲区,当环形缓冲区中的数据达到一定的阈值后,会对其中的数据进行一次快速排序,将排序后的数据溢写到磁盘上,当所有的数据处理完毕后,会对磁盘上的所有文件进行一次归并排序。
  • 对于 ReduceTask ,他从每个 MapTask 上远程拷贝相应是数据文件保存在内存中,如果文件大小/S超过一定的阈值,溢写到磁盘上,如果溢写到磁盘上的文件数目超过一定的阈值,进行一次归并排序,合并成一个更大的文件;ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。

  • 排序分类

    • 部分排序: MapReduce 对数据的数据进行排序,保证输出数据的每个文件 内部有序
    • 全排序 : 最终输出只有一个文件,且文件内部有序。实现方式是设置 一个 ReduceTask ,但是这方法效率低。
    • 辅助排序 :在 Reduce 端 对 Key 进行排序。用于:接收的 key 是bean 对象时,想让一个或者多个字段相同的key 金屠刀同一个 Reduce 中。
    • 二次排序 : 在自定义排序过程中,如果 compareTo 的判断条件为两个 即为 二次排序
  • 自定义排序实现

    将 bean 作为 key 传输,需要实现 WritableComparable接口重写compareTo方法,就可以实现排序。
    
    //  implements WritableComparable<T>
    @Override
    public int compareTo(FlowBean bean) {
    
      int result;
    
      // 按照总流量大小,倒序排列
      if (this.sumFlow > bean.getSumFlow()) {
          result = -1;
      }else if (this.sumFlow < bean.getSumFlow()) {
          result = 1;
      }else {
          result = 0;
      }
    
      return result;
    }
    

    7. Combiner 合并

  • 介绍

    • Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件,父类是Reducer。和 Reducer 区别在于 :Combiner 是在每一个 MapTask 节点上运行,接收的是单个节点数据,而 Reducer 是接受全局所有 Mapper 的数据
    • Combiner 是意义是对每一个 MapTask 输出结果进行局部汇总,以减少网络传输量
    • Combiner 应用的前提是不能影响最终的业务逻辑,而且 输出的 kv 应和Reducer 一样。比如下面这个例子(计算均值)就不合适使用 :

image.png

  • 实现步骤

    • 自定义 Combiner 继承 Reducer ,重写 Reduce 方法。

      public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
      
      private IntWritable outV = new IntWritable();
      
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      
         int sum = 0;
         for (IntWritable value : values) {
             sum += value.get();
         }
      
         outV.set(sum);
      
         context.write(key,outV);
      }
      }
      
    • 在 驱动类中 指定 Combiner

job.setCombinerClass(WordCountCombiner.class);

  • 运行结果:

image.png

4. OutputFormat数据输出

1. OutputFormat 接口实现类

OutputForMat 是 MapReduce 输出的基类,目前有这么几种实现类
image.png
默认的输出格式是 TextOutputFormat

2. 自定义 OutputForMat

  • 步骤:
    • 自定义一个类继承 FileOutputForMat
    • 改写 RecordWriter
  • 案例

    • 需求 : 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
    • 输入数据
      http://www.baidu.com
      http://www.google.com
      http://cn.bing.com
      http://www.atguigu.com
      http://www.sohu.com
      http://www.sina.com
      http://www.sin2a.com
      http://www.sin2desa.com
      http://www.sindsafa.com
      
  • 期望输出 ```java // atguigu.log http://www.atguigu.com

// other.txt http://cn.bing.com http://www.baidu.com http://www.google.com http://www.sin2a.com http://www.sin2desa.com http://www.sina.com http://www.sindsafa.com http://www.sohu.com


- 分析

![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647673906122-1fba998e-f42b-4a71-a68f-99ac553b894f.png#clientId=uda7d26c7-d4dd-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=717&id=u2e4b22da&margin=%5Bobject%20Object%5D&name=image.png&originHeight=896&originWidth=1822&originalType=binary&ratio=1&rotation=0&showTitle=false&size=177114&status=done&style=none&taskId=u46a64b64-fef6-484c-90fb-34708236838&title=&width=1457.6)
<a name="hm6Zv"></a>
### 5. MapReduce 内核源码解析
<a name="S5E2O"></a>
#### 1. 工作机制
![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647681646287-4a8691a2-8c00-4e79-8cb3-732c52021bf9.png#clientId=u8e2ad613-67b7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=788&id=u3c8a232e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=985&originWidth=1925&originalType=binary&ratio=1&rotation=0&showTitle=false&size=233533&status=done&style=none&taskId=u0b2e7e65-c7d0-4a38-a7f9-f12dbad559d&title=&width=1540)

- Read阶段:MapTask通过 InputFormat 获得的 RecordReader,从输入InputSplit中解析出一个个key/value。
- Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
- Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
- Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
   - 溢写阶段详情
      - 利用快速排序算法对缓冲区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key 进行排序。这样经过排序后,数据以分区的单位聚集在一起,且同一分区内所有数据按照 key 有序
      - 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
      - 将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
- Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方        式。    每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,<br />对 文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647683444794-67528089-68f3-4a52-a172-f632f7ca970a.png#clientId=u8e2ad613-67b7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=730&id=ufa059bba&margin=%5Bobject%20Object%5D&name=image.png&originHeight=913&originWidth=1901&originalType=binary&ratio=1&rotation=0&showTitle=false&size=148853&status=done&style=none&taskId=ub7baf32c-9e22-4abe-b5df-b87c7bebd72&title=&width=1520.8)

- Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可
- Reduce阶段:reduce()函数将计算结果写到HDFS上。
<a name="SoOf2"></a>
#### 2. ReduceTask并行度决定机制
MapTask 并行度由切片个数决定,切片个数由输入文件和切片规则决定
<a name="VVpVR"></a>
##### 1. 设置 ReduceTask 并行度(个数)
ReduceTask 的并行度同样影响整个Job的执行并发度和执行效率,但是与 MapTask 的并发数有切片数决定不同,ReduceTask 的数量是可以直接手动设置的。<br />// 默认值是1,手动设置为4<br />job.setNumReduceTasks(4);
<a name="ROZtC"></a>
##### 2. 测试 多少个 ReduceTask 合适?

1. 实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G
1. 结论:
| MapTask =16 |  |  |  |  |  |  |  |  |  |  |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| ReduceTask | 1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 |
| 总时间 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |

<a name="fjvPw"></a>
##### 3. 注意事项

- ReduceTask = 0 表示没有 Reduce 阶段,输出文件的个数与Map 个数一致
- 因为 ReduceTask 默认数组是1,所以输出文件个数为1
- 如果数据分布不均匀,有可能导致Reduce阶段产生数据倾斜
- 如果分区数不是1,但是ReduceTask 为1,不会执行分区过程。
<a name="XMTML"></a>
### 6. Join 应用
<a name="dJq2Q"></a>
#### 1. Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。<br />Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
<a name="ai7IT"></a>
#### 2. 案例

- 需求
   - 订单数据表
| id | pid | amount |
| --- | --- | --- |
| 1001 | 01 | 1 |
| 1002 | 02 | 2 |
| 1003 | 03 | 3 |
| 1004 | 01 | 4 |
| 1005 | 02 | 5 |
| 1006 | 03 | 6 |

   - 商品信息表
| pid | pname |
| --- | --- |
| 01 | 小米 |
| 02 | 华为 |
| 03 | 格力 |

   - 将商品信息表中的数据根据商品 pid 合并到订单数据表中
| id | pname | amount |
| --- | --- | --- |
| 1001 | 小米 | 1 |
| 1004 | 小米 | 4 |
| 1002 | 华为 | 2 |
| 1005 | 华为 | 5 |
| 1003 | 格力 | 3 |
| 1006 | 格力 | 6 |

- 需求分析

![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1647693303813-10f82774-7737-441a-b126-2d4cb4722487.png#clientId=u739a95ae-494b-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=751&id=u72836568&margin=%5Bobject%20Object%5D&name=image.png&originHeight=939&originWidth=1865&originalType=binary&ratio=1&rotation=0&showTitle=false&size=197499&status=done&style=none&taskId=u473561c8-ce6f-4104-ab75-197677e4847&title=&width=1492)    将关联条件作为 Map 输出的 Key ,并在value 中添加数据来源, 在 Reduce端,相同的 key 会添加到同一个 ReduceTask ,这样在 ReduceTask 中进行数据串联。
```java

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String id; //订单id
    private String pid; //产品id
    private int amount; //产品数量
    private String pname; //产品名称
    private String flag; //判断是order表还是pd表的标志字段

    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }
}
mport org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {

    private String filename;
    private Text outK = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取对应文件名称
        InputSplit split = context.getInputSplit();
        FileSplit fileSplit = (FileSplit) split;
        filename = fileSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取一行
        String line = value.toString();

        //判断是哪个文件,然后针对文件进行不同的操作
        if(filename.contains("order")){  //订单表的处理
            String[] split = line.split("\t");
            //封装outK
            outK.set(split[1]);
            //封装outV
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
        }else {                             //商品表的处理
            String[] split = line.split("\t");
            //封装outK
            outK.set(split[0]);
            //封装outV
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        //写出KV
        context.write(outK,outV);
    }
}

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {

        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        for (TableBean value : values) {

            //判断数据来自哪个表
            if("order".equals(value.getFlag())){   //订单表

              //创建一个临时TableBean对象接收value
                TableBean tmpOrderBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tmpOrderBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

              //将临时TableBean对象添加到集合orderBeans
                orderBeans.add(tmpOrderBean);
            }else {                                    //商品表
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        //遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出
        for (TableBean orderBean : orderBeans) {

            orderBean.setPname(pdBean.getPname());

           //写出修改后的orderBean对象
            context.write(orderBean,NullWritable.get());
        }
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
  • 总结:

    这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。如何解决: 在 Map 端实现数据合并

    3. Map join

    适用于 大小表连接

  • 优点 :在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

  • 具体操作 : 1. 在 driver 中加载缓存

//缓存普通文件到Task运行节点。
job.addCacheFile(new URI(“file:///e:/cache/pd.txt”));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI(“hdfs://hadoop102:8020/cache/pd.txt”));
2. 在 Mapper 的 setup 阶段,将文件读取到缓冲几何中;

4. 案例操作

  • 分析

image.png

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> pdMap = new HashMap<>();
    private Text text = new Text();

    //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);

        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);

        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行    
//01    小米
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }

        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //读取大表数据    
//1001    01    1
        String[] fields = value.toString().split("\t");

        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);

        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        //写出
        context.write(text,NullWritable.get());
    }
}

5. ETL

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

  • 案例

    • 输入数据

      194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
      183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
      163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
      163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
      101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
      101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
      60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
      
    • 要求:每行字段长度都大于11。

    • 分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗。
    • 代码 ```java import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

public class WebLogMapper extends Mapper{

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    // 1 获取1行数据
    String line = value.toString();

    // 2 解析日志
    boolean result = parseLog(line,context);

    // 3 日志不合法退出
    if (!result) {
        return;
    }

    // 4 日志合法就直接写出
    context.write(value, NullWritable.get());
}

// 2 封装解析日志的方法
private boolean parseLog(String line, Context context) {

    // 1 截取
    String[] fields = line.split(" ");

    // 2 日志长度大于11的为合法
    if (fields.length > 11) {
        return true;
    }else {
        return false;
    }
}

}

```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WebLogDriver {
    public static void main(String[] args) throws Exception {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "D:/input/inputlog", "D:/output1" };

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 加载jar包
        job.setJarByClass(LogDriver.class);

        // 3 关联map
        job.setMapperClass(WebLogMapper.class);

        // 4 设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置reducetask个数为0
        job.setNumReduceTasks(0);

        // 5 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交
         boolean b = job.waitForCompletion(true);
         System.exit(b ? 0 : 1);
    }
}

7. MapReduce 开发总结

1. 数据输入接口: InputFormat

  • 使用的默认输入类 TextInputFormat,常用的有两个 它和 CombineTextFormat
  • TextInputFormat : 每次读取一行数据,该行的起始偏移量作为 key ,行内容为 value
  • CombineTextFormat: 可以把多个小文件划分为一个大文件组成一个切片进行处理,提高处理效率。

    2. 逻辑处理接口 Mapper

    根据自己的需要编写 map() setup() cleanup ()

    3. Partitioner 分区

  • 默认实现类 HashPartitioner .实现逻辑是根据key 的哈希值和 num of reduce 取余 作为分区号

    key.hashCode()&Integer.MAXVALUE % numReduces

  • 可以自定义分区

    4. Comparable 排序

  • 注意:当我们用自定义的 bean 作为 key 时,必须要继承 WritableComparable接口实现 compareTo()方法

  • 排序分为三种

    • 部分排序: 对最终输出的每一个文件进行内部排序
    • 全排序: 对所有的数据进行排序,通常只有一个 reduce
    • 二次排序 : 排序的条件有两个

      5. Combiner 合并

      Combiner 合并可以提高并发执行的效率,减少io传输,但是需要考虑使用后是否影响原结果

      6. 逻辑处理接口 Reducer

      根据自己的需要实现其中的三个方法: reduce() setup() cleanup ()

      7. 输出数据接口:OutputFormat

  • 默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。可以自定义OutputFormat。

    四. 数据压缩

    1. 概述

  • 数据压缩 介意减少磁盘IO,节省磁盘空间、缺点是增加 CPU 的开销。

  • 压缩原则 :

    • 运算密集的 job,尽量不压缩
    • IO密集的ob,尽量压缩

      2. MR 支持的压缩编码

      | 压缩格式 | Hadoop自带? | 算法 | 扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 | | —- | —- | —- | —- | —- | —- | | DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 | | Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 | | bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 | | LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 | | Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
  • 压缩性能比较 | 压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 | | —- | —- | —- | —- | —- | | gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s | | bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s | | LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |

3. 压缩方式选择

压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。

  • Gzip 压缩
    • 优点: 压缩率比较高
    • 缺点: 不支持切片,压缩和解压速度一般
  • Bzip2
    • 优点:压缩率高;支持Split;
    • 缺点:压缩/解压速度慢。
  • Lzo 压缩
    • 优点:压缩/解压速度比较快;支持Split;
    • 缺点:压缩率一般;想支持切片需要额外创建索引。
  • Snappy 压缩

    • 优点:压缩和解压缩速度快;
    • 缺点:不支持Split;压缩率一般;

      4. 压缩位置选择

      image.png### 5. 压缩参数配置
  • 编码解码器 | 压缩格式 | 对应的编码/解码器 | | —- | —- | | DEFLATE | org.apache.hadoop.io.compress.DefaultCodec | | gzip | org.apache.hadoop.io.compress.GzipCodec | | bzip2 | org.apache.hadoop.io.compress.BZip2Codec | | LZO | com.hadoop.compression.lzo.LzopCodec | | Snappy | org.apache.hadoop.io.compress.SnappyCodec |

  • 配置 | 参数 | 默认值 | 阶段 | 建议 | | —- | —- | —- | —- | | io.compression.codecs
    (在core-site.xml中配置) | 无,这个需要在命令行输入hadoop checknative查看 | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 | | mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 | | mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 企业多使用LZO或Snappy编解码器在此阶段压缩数据 | | mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 | | mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |

6. 使用

  • Map输出端采用压缩

在 driver 中添加 如下,其他不变

        Configuration conf = new Configuration();

        // 开启map端输出压缩
        conf.setBoolean("mapreduce.map.output.compress", true);

        // 设置map端输出压缩方式
        conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);
  • Reduce 输出端采用压缩 ```java

      // 设置reduce端输出压缩开启
      FileOutputFormat.setCompressOutput(job, true);
    
      // 设置压缩的方式
      FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
    

    // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

<a name="qvUJS"></a>
## 五、常见错误

1. Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable.  报的错误是类型转换异常。
1. java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。
1. 如果分区数不是1,但是reducetask为1,是否执行分区过程。答案 :不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
1. 在Windows环境编译的jar包导入到Linux环境中运行,

hadoop jar wc.jar com.allen.mapreduce.wordcount.WordCountDriver /user/allen/ /user/allen/output<br />报如下错误:<br />Exception in thread "main" java.lang.UnsupportedClassVersionError:             com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0<br />原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。<br />解决方案:统一jdk版本。

5. 缓存pd.txt小文件案例中,报找不到pd.txt文件

原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。

6. 集群中运行wc.jar时出现了无法获得输入文件。

原因:WordCount案例的输入文件不能放用HDFS集群的根目录。

7. 自定义Outputformat时,注意在RecordWirter中的close方法必须关闭流资源。否则输出的文件内容中数据为空。
```java
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if (atguigufos != null) {
            atguigufos.close();
        }
        if (otherfos != null) {
            otherfos.close();
        }
}