MapReduce思想

MapReduce的思想核心是分而治之。
image.png
经过查看分析官方WordCount案例源码我们发现一个统计单词数量的MapReduce程序的代码由三个部
分组成,

  • Mapper类
  • Reducer类
  • 运行作业的代码(Driver)

Mapper类继承了org.apache.hadoop.mapreduce.Mapper类重写了其中的map方法,Reducer类继承
了org.apache.hadoop.mapreduce.Reducer类重写了其中的reduce方法。

  • 重写的Map方法作用:map方法其中的逻辑就是用户希望mr程序map阶段如何处理的逻辑;
  • 重写的Reduce方法作用:reduce方法其中的逻辑是用户希望mr程序reduce阶段如何处理的逻辑;

Hadoop序列化

为什么进行序列化?
序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结
构。
观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束,比如自定义Mapper有四个形
参类型,但是形参类型并不是常见的java基本类型。
为什么Hadoop要选择建立自己的序列化格式而不使用java自带serializable?

  • 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过RPC(远

程过程调用:Remote Procedure Call)实现;RPC将消息序列化成二进制流发送到远程节点,远
程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:

  • 紧凑:数据更紧凑,能充分利用网络带宽资源
  • 快速:序列化和反序列化的性能开销更低
    • Hadoop使用的是自己的序列化格式Writable,它比java的序列化serialization更紧凑速度更快。一

个对象使用Serializable序列化后,会携带很多额外信息比如校验信息,Header,继承体系等。
Java基本类型与Hadoop常用序列化类型

java基本类型 hadoop writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

MapReduce编程规范及示例编写

Mapper类

  • 用户自定义一个Mapper类继承Hadoop的Mapper类
  • Mapper的输入数据是KV对的形式(类型可以自定义)
  • Map阶段的业务逻辑定义在map()方法中
  • Mapper的输出数据是KV对的形式(类型可以自定义)

注意:map()方法是对输入的一个KV对调用一次!!

Reducer类

  • 用户自定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
  • Reducer的业务逻辑写在reduce()方法中
  • Reduce()方法是对相同K的一组KV对调用执行一次

Driver阶段

创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路
径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运
行。
image.png

WordCount代码实现

需求

在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:wc.txt;
输出:

  1. apache 2
  2. clickhouse 2
  3. hadoop 1
  4. mapreduce 1
  5. spark 2
  6. xiaoming 1

具体步骤

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

新建maven工程

导入hadoop依赖

<dependencies> 
   <dependency> 
     <groupId>org.apache.logging.log4j</groupId> 
     <artifactId>log4j-core</artifactId> 
     <version>2.8.2</version> 
   </dependency> 
   <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.9.2</version>
   </dependency> 
   <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId>
      <version>2.9.2</version> 
   </dependency> 
   <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <version>2.9.2</version> 
   </dependency> 
</dependencies>

<!--maven打包插件 --> 
<build>
  <plugins>
    <plugin>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>2.3.2</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

注意:以上依赖第一次需要联网下载!!

添加log4j.properties

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

整体思路梳理(仿照源码)

Map阶段:

  1. map()方法中把传入的数据转为String类型
  2. 根据空格切分出单词
  3. 输出<单词,1>

Reduce阶段:

  1. 汇总各个key(单词)的个数,遍历value数据进行累加
  2. 输出key的总数

Driver

  1. 获取配置文件对象,获取job对象实例
  2. 指定程序jar的本地路径
  3. 指定Mapper/Reducer类
  4. 指定Mapper输出的kv数据类型
  5. 指定最终输出的kv数据类型
  6. 指定job处理的原始数据路径
  7. 指定job输出结果路径
  8. 提交作业


1. 编写Mapper类

import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 
    Text k = new Text(); 
    IntWritable v = new IntWritable(1); 
    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
        // 1 获取一行 
        String line = value.toString(); 
        // 2 切割 
        String[] words = line.split(" "); 
        // 3 输出 
        for (String word : words) { 
            k.set(word);
            context.write(k, v); 
        } 
    } 
}

继承的Mapper类型选择新版本API:
image.png

2. 编写Reducer类

import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 
    int sum; IntWritable v = new IntWritable();
    @Override 
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { 
        // 1 累加求和 
        sum = 0; 
        for (IntWritable count : values) { 
            sum += count.get(); 
        }
        // 2 输出 v.set(sum); context.write(key,v); 
    } 
}

选择继承的Reducer类
image.png

3. 编写Driver驱动类

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class WordcountDriver { 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及封装任务 
        Configuration configuration = new Configuration(); 
        Job job = Job.getInstance(configuration); 
        // 2 设置jar加载路径 
        job.setJarByClass(WordcountDriver.class); 
        // 3 设置map和reduce类 
        job.setMapperClass(WordcountMapper.class); 
        job.setReducerClass(WordcountReducer.class); 
        // 4 设置map输出 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(IntWritable.class); 
        // 5 设置最终输出kv类型 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class); 
        // 6 设置输入和输出路径 
        FileInputFormat.setInputPaths(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 提交 boolean result = job.waitForCompletion(true); 
        System.exit(result ? 0 : 1); 
    } 
}

运行任务

  1. 本地模式

直接Idea中运行驱动类即可
idea运行需要传入参数:
image.png

选择editconfiguration
image.png

在program arguments设置参数
image.png

运行结束,去到输出结果路径查看结果

image.png

注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程
方式模拟的mr的运行。

  1. yarn集群模式
    • 把程序打成jar包,改名为wc.jar;上传到Hadoop集群

选择合适的Jar包
image.png
准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行无法获取数
据!!

  • 启动Hadoop集群(Hdfs,Yarn)
  • 使用Hadoop 命令提交任务运行
    hadoop jar wc.jar com.yue.wordcount.WordcountDriver /user/input /user/output
    
    Yarn集群任务运行成功展示图
    image.png

    序列化Writable接口

基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对
象就需要实现Writable序列化接口。

1. 实现Writable序列化步骤如下

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    public CustomBean() { 
    super(); 
    }
    
  3. 重写序列化方法

    @Override 
    public void write(DataOutput out) throws IOException {
     .... 
    }
    
  4. 重写反序列化方法

    @Override 
    public void readFields(DataInput in) throws IOException { 
     ....
    }
    
  5. 反序列化的字段顺序和序列化字段的顺序必须完全一致

  6. 方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符
  7. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为因

为MapReduce框中的Shuffle过程要求对key必须能排序!!

@Override 
public int compareTo(CustomBean o) { 
    // 自定义排序规则 
    return this.num > o.getNum() ? -1 : 1;
}

Writable接口案例

1. 需求

统计每台智能音箱设备内容播放时长
原始日志格式

001 001577c3 kar_890809 120.196.100.99 1116 954 200 日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内 容时长(秒) 网络状态码

输出结果

001577c3 11160 9540 20700 设备id 自有内容时长(秒) 第三方内容时长(秒) 总时长

2. 编写MapReduce程序

  1. 创建SpeakBean对象 ```java package com.yue.hdfs;

import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

// 1 实现writable接口 public class SpeakBean implements Writable { private long selfDuration; private long thirdPartDuration; private long sumDuration; //2 反序列化时,需要反射调用空参构造函数,所以必须有 public SpeakBean() { } public SpeakBean(long selfDuration, long thirdPartDuration) { this.selfDuration = selfDuration; this.thirdPartDuration = thirdPartDuration; this.sumDuration=this.selfDuration+this.thirdPartDuration; } //3 写序列化方法 public void write(DataOutput out) throws IOException { out.writeLong(selfDuration); out.writeLong(thirdPartDuration); out.writeLong(sumDuration); } //4 反序列化方法 //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 public void readFields(DataInput in) throws IOException { this.selfDuration = in.readLong(); this.thirdPartDuration = in.readLong(); this.sumDuration = in.readLong(); } // 6 编写toString方法,方便后续打印到文本 @Override public String toString() { return selfDuration + “\t” + thirdPartDuration + “\t” + sumDuration ; }

public long getSelfDuration() { 
    return selfDuration; 
}
public void setSelfDuration(long selfDuration) { 
    this.selfDuration = selfDuration; 
}
public long getThirdPartDuration() { 
    return thirdPartDuration; 
}
public void setThirdPartDuration(long thirdPartDuration) { 
    this.thirdPartDuration = thirdPartDuration; 
}
public long getSumDuration() { 
    return sumDuration; 
}
public void setSumDuration(long sumDuration) { 
    this.sumDuration = sumDuration; 
}
public void set(long selfDuration, long thirdPartDuration) { 
    this.selfDuration = selfDuration; 
    this.thirdPartDuration = thirdPartDuration; 
    this.sumDuration=this.selfDuration+this.thirdPartDuration; 
} 

}


2. 编写Mapper类 
```java
package com.yue.hdfs; 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException;

public class SpeakDurationMapper extends Mapper<LongWritable, Text, Text, SpeakBean> { 
    SpeakBean v = new SpeakBean(); 
    Text k = new Text(); 

    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
        // 1 获取一行 
        String line = value.toString(); 
        // 2 切割字段 
        String[] fields = line.split("\t"); 
        // 3 封装对象 
        // 取出设备id 
        String deviceId = fields[1]; 
        // 取出自有和第三方时长数据 
        long selfDuration = Long.parseLong(fields[fields.length - 3]); 
        long thirdPartDuration = Long.parseLong(fields[fields.length - 2]); 
        k.set(deviceId); 
        v.set(selfDuration, thirdPartDuration); 
        // 4 写出 context.write(k, v); 
    } 
}
  1. 编写Reducer ```java package com.yue.hdfs;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakDurationReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { long self_Duration = 0; long thirdPart_Duration = 0; // 1 遍历所用bean,将其中的自有,第三方时长分别累加 for (SpeakBean sb : values) { self_Duration += sb.getSelfDuration(); thirdPart_Duration += sb.getThirdPartDuration(); } // 2 封装对象 SpeakBean resultBean = new SpeakBean(self_Duration, thirdPart_Duration); // 3 写出 context.write(key, resultBean); } }


4. 编写驱动
```java
package com.yue.hdfs; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

import java.io.IOException; 

public class SpeakerDriver { 
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 
        args = new String[] { "e:/input/input", "e:/output1" }; 
        // 1 获取配置信息,或者job对象实例 
        Configuration configuration = new Configuration(); 
        Job job = Job.getInstance(configuration); 
        // 6 指定本程序的jar包所在的本地路径 
        job.setJarByClass(SpeakerDriver.class); 
        // 2 指定本业务job要使用的mapper/Reducer业务类 
        job.setMapperClass(SpeakDurationMapper.class); 
        job.setReducerClass(SpeakDurationReducer.class); 
        // 3 指定mapper输出数据的kv类型 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(SpeakBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(SpeakBean.class);
        // 5 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 
        boolean result = job.waitForCompletion(true); 
        System.exit(result ? 0 : 1);
    } 
}

mr编程技巧总结

  • 结合业务设计Map输出的key和v,利用key相同则去往同一个reduce的特点!!
  • map()方法中获取到只是一行文本数据尽量不做聚合运算
  • reduce()方法的参数要清楚含义

MapReduce原理分析

1. MapTask运行机制详解

MapTask流程

image.png
详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文

件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关
系默认是一对一。

  1. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n

作为分隔符,读取一行数据,返回。Key表示每行首字符偏移值,value表示这一行
文本内容。

  1. 读取split返回,进入用户自己继承的Mapper类中,执行用户重写的map函数。

RecordReader读取一行这里调用一次。

  1. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先

对其进行分区处理,默认使用HashPartitioner。

MapReduce**提供Partitioner接口,它的作用就是根据keyvaluereduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。**

  1. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结

果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之
前,key与value值都会被序列化成字节数组。

  • 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信

息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概
念。

  • 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以

需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘
写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写
map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例
spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill
percent = 100MB
0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map
task的输出结果还可以往剩下的20MB内存中写,互不影响。

  1. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!
  • 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的

value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整
个模型中会多次使用。

  • 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner

绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value
类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,
如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

  1. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果

map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当
整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入
磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

至此map整个阶段结束!!

MapTask的一些配置
image.png
官方参考地址

https://hadoop.apache.org/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-
client-core/mapred-default.xml

2. MapTask的并行度

  1. MapTask并行度思考

MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。
思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  1. MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。
切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
image.png

切片机制源码阅读

image.png
image.png
默认就是128M;
MapTask并行度是不是越多越好呢?
答案不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split.

MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分
片,一个是128M,一个是1M;
对于1M的切片的Maptask来说,太浪费资源。

129M**的文件在Hdfs存储的时候会不会切成两**块?

3. ReduceTask 工作机制

image.png
Reduce大致分为copy**sort**reduce三个阶段,重点在前两个阶段。copy阶段包含一个
eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线
程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据
进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行
finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤:

  • Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求

maptask获取属于自己的文件。

  • Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数

值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge
有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的
数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过
程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种
merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge
方式生成最终的文件。

  • 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
  • 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个

或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

4. ReduceTask并行度

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定
不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4 
job.setNumReduceTasks(4);

注意事项

  1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
  2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;
  3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;

5. Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫
shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)
image.png
image.png

1 MapReduce的分区与reduceTask的数量

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key**相同去往同个分区**),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据,
如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!!

1. **分区源码**
翻阅源码验证以上规则,MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区!!

image.png
2. **自定义分区**
实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以及分区数量!!

如何制定自己需要的分区规则?

具体步骤

  1. 自定义类继承Partitioner,重写getPartition()方法
  2. 在Driver驱动中,指定使用自定义Partitioner
  3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。

需求 按照不同的appkey把记录输出到不同的分区中

原始日志格式

001 001577c3 kar_890809 120.196.100.99 1116 954 200 日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内 容时长(秒) 网络状态码

输出结果

根据appkey把不同厂商的日志数据分别输出到不同的文件中

需求分析
面对业务需求,结合mr的特点,来设计map输出的kv,以及reduce输出的kv数据。
一个ReduceTask对应一个输出文件,因为在shuffle机制中每个reduceTask拉取的都是某一个分区的数据,一个分区对应一个输出文件。
结合appkey的前缀相同的特点,同时不能使用默认分区规则,而是使用自定义分区器,只要appkey前缀相同则数据进入同个分区。

整体思路

Mapper

  1. 读取一行文本,按照制表符切分
  2. 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
  3. 设计map()输出的kv,key—>appkey(依靠该字段完成分区),PartitionBean对象作为Value输出

Partition
自定义分区器,实现按照appkey字段的前缀来区分所属分区

Reduce

  1. reduce()正常输出即可,无需进行聚合操作

Driver

  1. 在原先设置job属性的同时增加设置使用自定义分区器
  2. 注意设置ReduceTask的数量(与分区数量保持一致)

Mapper

package com.yue.mr.partition; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException; 

public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> { 
    final PartitionBean bean = new PartitionBean(); 
    Text text = new Text();

    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
        final String[] fields = value.toString().split("\t"); 
        String appkey = fields[2]; 
        bean.setId(fields[0]); 
        bean.setDeviceId(fields[1]);
        bean.setAppkey(appkey); 
        bean.setIp(fields[3]); 
        bean.setSelfDuration(Long.parseLong(fields[4])); 
        bean.setThirdPartDuration(Long.parseLong(fields[5])); 
        bean.setStatus(fields[6]); 
        text.set(appkey); 
        context.write(text, bean); 
    } 
}

PartitionBean

package com.yue.mr.partition; 

import org.apache.hadoop.io.Writable; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException;

public class PartitionBean implements Writable { 
    //定义属性 
    private String id; 
    //日志id 
    private String deviceId;
    //设备id 
    private String appkey;
    //appkey合作硬件厂商id 
    private String ip;
    //ip地址 
    private Long selfDuration;
    //自有内容时长 
    private Long thirdPartDuration;
    //第三方内容时长 
    private String status;
    //状态码 
    public PartitionBean() { 
    }
    public PartitionBean(String id, String deviceId, String appkey, String ip, 
                         Long selfDuration, Long thirdPartDuration, String status) { 
        this.id = id; 
        this.deviceId = deviceId; 
        this.appkey = appkey; 
        this.ip = ip;
        this.selfDuration = selfDuration; 
        this.thirdPartDuration = thirdPartDuration; 
        this.status = status; 
    }
    public String getId() { 
        return id; 
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getDeviceId() { 
        return deviceId;
    }
    public void setDeviceId(String deviceId) { 
        this.deviceId = deviceId; 
    }
    public String getAppkey() { 
        return appkey; 
    }
    public void setAppkey(String appkey) { 
        this.appkey = appkey; 
    }
    public String getIp() { 
        return ip; 
    }
    public void setIp(String ip) { 
        this.ip = ip; 
    }
    public Long getSelfDuration() {
        return selfDuration; 
    }
    public void setSelfDuration(Long selfDuration) {
        this.selfDuration = selfDuration; 
    }
    public Long getThirdPartDuration() { 
        return thirdPartDuration;
    }
    public void setThirdPartDuration(Long thirdPartDuration) { 
        this.thirdPartDuration = thirdPartDuration; 
    }
    public String getStatus() { 
        return status; 
    }
    public void setStatus(String status) { 
        this.status = status;
    }
    //序列化 
    @Override 
    public void write(DataOutput out) throws IOException { 
        out.writeUTF(id); 
        out.writeUTF(deviceId); 
        out.writeUTF(appkey); 
        out.writeUTF(ip); 
        out.writeLong(selfDuration); 
        out.writeLong(thirdPartDuration);
        out.writeUTF(status); 
    }
    //反序列化
    @Override 
    public void readFields(DataInput in) throws IOException { 
        this.id = in.readUTF(); 
        this.deviceId=in.readUTF(); 
        this.appkey=in.readUTF(); 
        this.ip=in.readUTF(); 
        this.selfDuration=in.readLong(); 
        this.thirdPartDuration=in.readLong();
        this.status=in.readUTF(); 
    }

    @Override public String toString() { 
        return id + '\t' + deviceId + '\t' + appkey + '\t' + ip + '\t' +
            selfDuration +'\t'+ thirdPartDuration + '\t' + status ; 
    } 
}

CustomPartitioner

package com.yue.mr.partition; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 

public class CustomPartitioner extends Partitioner<Text,PartitionBean> { 
    @Override 
    public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) { 
        int partition=0;
        final String appkey = text.toString(); 
        if(appkey.equals("kar")){ 
            partition=1; 
        }else if(appkey.equals("pandora")){
            partition=2; 
        }else{
            partition=0; 
        }
        return partition;
    } 
}

PartitionReducer

package com.yue.mr.partition; 

import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException; 

public class PartitionReducer extends Reducer<Text, PartitionBean,
NullWritable, PartitionBean> { 
    @Override 
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) 
        throws IOException, InterruptedException { 
        for (PartitionBean bean : values) { 
            context.write(NullWritable.get(), bean); 
        } 
    } 
}

PartitionDriver

package com.yue.mr.partition;

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

import java.io.IOException; 

public class PartitionDriver { 
    public static void main(String[] args) throws IOException, ClassNotFoundException, 
    InterruptedException { 
        final Configuration conf = new Configuration(); 
        final Job job = Job.getInstance(conf); 

        job.setJarByClass(PartitionDriver.class);

        job.setMapperClass(PartitionMapper.class); 
        job.setReducerClass(PartitionReducer.class); 

        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(PartitionBean.class);
        job.setOutputKeyClass(NullWritable.class); 
        job.setOutputValueClass(PartitionBean.class); 

        job.setPartitionerClass(CustomPartitioner.class);
        job.setNumReduceTasks(5); 
        FileInputFormat.setInputPaths(job, new Path("e:/speak.data")); 
        FileOutputFormat.setOutputPath(job, new Path("e:/partition/output")); 

        final boolean flag = job.waitForCompletion(true); 
        System.exit(flag ? 0 : 1); 
    } 
}

总结

  1. 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
  2. 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
  3. 如果reduceTask数量大于分区数量,但是输出多个空文件
  4. 如果reduceTask数量小于分区数量,有可能会报错。

image.png

2 MapReduce中的Combiner

combiner运行机制:
image.png

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和reducer的区别在于运行的位置
  4. Combiner是在每一个maptask所在的节点运行;
  5. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
  6. Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer

的输入kv类型要对应起来。

举例说明

假设一个计算平均值的MR任务

Map阶段
2个MapTask
MapTask1输出数据:10,5,15 如果使用Combiner:(10+5+15)/3=10
MapTask2输出数据:2,6 如果使用Combiner:(2+6)/2=4
Reduce阶段汇总
(10+4)/2=7
而正确结果应该是
(10+5+15+2+6)/5=7.6

  • 自定义Combiner实现步骤
    • 自定义一个Combiner继承**Reducer**,重写Reduce方法
    • 在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)

1. **改造WordCount程序**

package com.yue.mr.wc; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Reducer; 

import javax.xml.soap.Text; 
import java.io.IOException; 

public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { 
    IntWritable total = new IntWritable(); 

    @Override 
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException { 
        //2 遍历key对应的values,然后累加结果 
        int sum = 0; 
        for (IntWritable value : values) { 
            int i = value.get();
            sum += 1; 
        }
        // 3 直接输出当前key对应的sum值,结果就是单词出现的总次数 
        total.set(sum); 
        context.write(key, total);
    }
}

在驱动(Driver)设置使用Combiner

job.setCombinerClass(WordcountCombiner.class);

验证结果
观察程序运行日志
image.png
如果直接使用WordCountReducer作为Combiner使用是否可以?

直接使用Reducer作为Combiner组件来使用是可以的!!

6. MapReduce中的排序

排序是**MapReduce**框架中最重要的操作之一
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序
中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是
快速排序
MapTask
它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲
区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,
溢写完毕后,它会对磁盘上所有文件进行归并排序。
ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排
序。

  1. 部分排序.

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

  1. 全排序

最终输出结果只有一个**文件**,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在
处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架
构。

  1. 辅助排序: ( GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

  1. 二次排序.

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

1 WritableComparable

Bean对象如果作为Map输出的key时,需要实现WritableComparable接口并重写compareTo方法指定
排序规则

1>**全排序

基于统计的播放时长案例的输出结果对总时长进行排序
实现全局排序只能设置一个ReduceTask!!

播放时长案例输出结果

00fdaf3 33180 33420 00fdaf3 66600 
00wersa4 30689 35191 00wersa4 65880 
0a0fe2 43085 44254 0a0fe2 87339 
0ad0s7 31702 29183 0ad0s7 60885 
0sfs01 31883 29101 0sfs01 60984 
a00df6s 33239 36882 a00df6s 70121 
adfd00fd5 30727 31491 adfd00fd5 62218

需求分析
如何设计map()方法输出的key,value
MR框架中shuffle阶段的排序是默认行为,不管你是否需要都会进行排序。
key:把所有字段封装成为一个bean对象,并且指定bean对象作为key输出,如果作为key输出,需要实
现排序接口,指定自己的排序规则;

具体步骤

Mapper

  1. 读取结果文件,按照制表符进行切分
  2. 解析出相应字段封装为SpeakBean
  3. SpeakBean实现WritableComparable接口重写compareTo方法
  4. map()方法输出kv;key—>SpeakBean,value—>NullWritable.get()

Reducer

  1. 循环遍历输出

Mapper代码

package com.yue.mr.WritableComparable;

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException; 

public class SortMapper extends Mapper<LongWritable, Text, SpeakBeanSort, NullWritable> { 
    @Override 
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException { 
        final String[] arr = value.toString().split("\t"); 
        final SpeakBeanSort bean = new SpeakBeanSort(Long.parseLong(arr[1]), 
                                                     Long.parseLong(arr[2]), arr[0]);
        context.write(bean, NullWritable.get()); 
    } 
}

Reducer

package com.yue.mr.WritableComparable; 

import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException; 

public class SortReducer extends Reducer<SpeakBeanSort, NullWritable, 
NullWritable, SpeakBeanSort> { 
    @Override 
    protected void reduce(SpeakBeanSort key, Iterable<NullWritable> values, 
                          Context context) throws IOException, InterruptedException { 
        for (NullWritable value : values) { 
            context.write(value, key); 
        } 
    } 
}

Bean对象实现WritableComparable接口

package com.yue.mr.WritableComparable; 

import org.apache.hadoop.io.WritableComparable; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

public class SpeakBeanSort implements WritableComparable<SpeakBeanSort> { 
    //定义属性 
    private Long selfDuration;
    //自有内容时长 
    private Long thirdPartDuration;
    //第三方内容时长 
    private String deviceId;
    //设备id 
    private Long sumDuration;
    //总时长 
    //准备一个空参构造 
    public SpeakBeanSort() { 
    }
    //序列化方法:就是把内容输出到网络或者文本中 
    @Override 
    public void write(DataOutput out) throws IOException { 
        out.writeLong(selfDuration); 
        out.writeLong(thirdPartDuration); 
        out.writeUTF(deviceId); 
        out.writeLong(sumDuration);
    }
    //有参构造
    public SpeakBeanSort(Long selfDuration, Long thirdPartDuration, String deviceId) { 
        this.selfDuration = selfDuration; 
        this.thirdPartDuration = thirdPartDuration; 
        this.deviceId = deviceId; 
        this.sumDuration = this.selfDuration + this.thirdPartDuration; 
    }
    //反序列化方法 
    @Override 
    public void readFields(DataInput in) throws IOException { 
        this.selfDuration = in.readLong();
        //自由时长 
        this.thirdPartDuration = in.readLong();
        //第三方时长 
        this.deviceId = in.readUTF();
        //设备id 
        this.sumDuration = in.readLong();
        //总时长 
    }
    public Long getSelfDuration() { 
        return selfDuration; 
    }
    public void setSelfDuration(Long selfDuration) { 
        this.selfDuration = selfDuration; 
    }
    public Long getThirdPartDuration() { 
        return thirdPartDuration; 
    }
    public void setThirdPartDuration(Long thirdPartDuration) { 
        this.thirdPartDuration = thirdPartDuration; 
    }
    public String getDeviceId() { 
        return deviceId; 
    }
    public void setDeviceId(String deviceId) { 
        this.deviceId = deviceId;
    }
    public Long getSumDuration() {
        return sumDuration;
    }
    public void setSumDuration(Long sumDuration) { 
        this.sumDuration = sumDuration;
    }
    //为了方便观察数据,重写toString()方法
    @Override public String toString() { 
        return selfDuration + "\t" + 
            thirdPartDuration + "\t" + deviceId + "\t" + sumDuration; 
    }
    //重写compareTo方法,此处是比较一个字段如果比较2个字段就是二次排序 
    @Override 
    public int compareTo(SpeakBeanSort o) { 
        int result; 
        // 按照总流量大小,倒序排列 
        if (sumDuration > o.getSumDuration()) { 
            result = -1; 
        }else if (sumDuration < o.getSumDuration()) { 
            result = 1; 
        }else {
            result = 0; 
        }return result; 
    } 
}

Driver

package com.yue.mr.WritableComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException; 

public class SortDriver {
    public static void main(String[] args) 
        throws ClassNotFoundException, IOException, InterruptedException { 
    // 1 获取配置信息,或者job对象实例 
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    // 2 指定本程序的jar包所在的本地路径 
    job.setJarByClass(SortDriver.class);
    // 3 指定本业务job要使用的mapper/Reducer业务类 
    job.setMapperClass(SortMapper.class); 
    job.setReducerClass(SortReducer.class);
    // 4 指定mapper输出数据的kv类型 
    job.setMapOutputKeyClass(SpeakBeanSort.class); 
    job.setMapOutputValueClass(NullWritable.class);
    // 5 指定最终输出的数据的kv类型
    job.setOutputKeyClass(NullWritable.class); 
    job.setOutputValueClass(SpeakBeanSort.class);
    // 6 指定job的输入原始文件所在目录 
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
    boolean result = job.waitForCompletion(true); 
    System.exit(result ? 0 : 1); 
    }
}

总结

  1. 自定义对象作为Map的key输出时,需要实现WritableComparable接口,排序:重写

compareTo()方法,序列以及反序列化方法

  1. 再次理解reduce()方法的参数;reduce()方法是map输出的kv中key相同的kv中的v组成一个集合调

用一次reduce()方法,选择遍历values得到所有的key.

  1. 默认reduceTask数量是1个;
  2. 对于全局排序需要保证只有一个reduceTask!!

2 **分区排序(默认的分区规则,区内有序)**

2 GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为
一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻
辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

  1. 需求
    原始数据
订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 232.8

需要求出每一个订单中成交金额最大的一笔交易。

  1. 实现思路

Mapper

  • 读取一行文本数据,切分出每个字段;
  • 订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id

相等再按照金额降序排;

  • map()方法输出kv;key—>bean对象,value—>NullWritable.get();

Shuffle

  • 指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)
    • 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;

Reduce

  • 每个reduce()方法写出一组key的第一个

参考代码
OrderBean

  • OrderBean定义两个字段,一个字段是orderId,第二个字段是金额(注意金额一定要使用

Double**或者DoubleWritable类型,否则没法按照金额顺序排序**)

排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排!!

package com.yue.mr.group; 

import org.apache.hadoop.io.WritableComparable; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

public class OrderBean implements WritableComparable<OrderBean> { 
    private String orderId; 
    private Double price; 

    @Override 
    public int compareTo(OrderBean o) { 
        //比较订单id的排序顺序 
        int i = this.orderId.compareTo(o.orderId); 
        if (i == 0) {
            //如果订单id相同,则比较金额,金额大的排在前面
            i = -this.price.compareTo(o.price); 
        }
        return i;
    }

    @Override 
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId); 
        out.writeDouble(price); 
    }

    @Override public void readFields(DataInput in) throws IOException { 
        this.orderId = in.readUTF(); 
        this.price = in.readDouble();
    }
    public OrderBean() { 
    }
    public OrderBean(String orderId, Double price) {
        this.orderId = orderId;
        this.price = price; 
    }
    public String getOrderId() { 
        return orderId; 
    }
    public void setOrderId(String orderId) { 
        this.orderId = orderId;
    }
    public Double getPrice() {
        return price;
    }
    public void setPrice(Double price) { 
        this.price = price; 
    }

    @Override public String toString() {
        return orderId + "\t" + price; 
    } 
}
  • 自定义分区器

保证ID相同的订单去往同个分区最终去往同一个Reduce中

package com.yue.mr.group; 

import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.Partitioner; 

public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override 
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //自定义分区,将相同订单id的数据发送到同一个reduce里面去 
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i; 
    } 
}
  • 自定义GroupingComparator

保证id相同的订单进入一个分组中,进入分组的数据已经是按照金额降序排序。reduce()方法取出
第一个即是金额最高的交易

package com.yue.mr.group; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {
    //将我们自定义的OrderBean注册到我们自定义的CustomGroupIngCompactor当中来
    //表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组 
    //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象 
    public CustomGroupingComparator() { 
        super(OrderBean.class, true); 
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) { 
        OrderBean first = (OrderBean) a; 
        OrderBean second = (OrderBean) b; 
        final int i = first.getOrderId().compareTo(second.getOrderId()); 
        if (i == 0) { 
            System.out.println(first.getOrderId() + "----" + second.getOrderId());
        }
        return i;
    } 
}
  • Mapper ```java package com.yue.mr.group;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行文本,然后切分 final String[] arr = value.toString().split(“\t”); final OrderBean bean = new OrderBean(arr[0], Double.parseDouble(arr[2])); context.write(bean, NullWritable.get()); } }


- Reducer
```java
package com.yue.mr.group;

import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { 
    @Override 
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) 
        throws IOException, InterruptedException { 
        System.out.println(); context.write(key, NullWritable.get());
    } 
}
  • Driver ```java package com.yue.mr.group;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;

public class GroupDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(GroupDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(GroupMapper.class); job.setReducerClass(GroupReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(“E:\teach\hadoop框架 \资料\data\GroupingComparator”)); FileOutputFormat.setOutputPath(job, new Path(“E:\group-out”)); // 7 指定分区器,指定分组比较器,设置reducetask数量 job.setPartitionerClass(CustomPartitioner.class); job.setGroupingComparatorClass(CustomGroupingComparator.class); job.setNumReduceTasks(2); // 8 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }


<a name="7EMkE"></a>
### 7. MapReduce读取和输出数据
<a name="YNT1f"></a>
#### 1 InputFormat
运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那<br />么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

InputFormat是MapReduce框架用来读取数据的类。

InputFormat常见子类包括: 

- TextInputFormat (普通文本文件,MR框架默认的读取实现类型)
- KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
- NLineInputF ormat(读取数据按照行数进行划分分片)
- CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
- 自定义InputFormat
1.  CombineTextInputFormat案例

MR框架默认的**TextInputFormat**切片机制按文件划分切片,文件无论多小,都是单独一个切片,<br />然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个<br />MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用<br />率不高。<br />**CombineTextInputForma**t用于小文件过多的场景,它可以将**多个小文件从逻辑上划分成一个切**<br />**片**,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。<br />需求<br />将输入数据中的多个小文件合并为一个切片处理

   - 运行WordCount案例,准备**多个小文件**

具体使用方式
```java
// 如果不设置InputFormat,它默认用的是TextInputFormat.class 
job.setInputFormatClass(CombineTextInputFormat.class); 
//虚拟存储切片最大值设置4m 
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

验证切片数量的变化!!

  • CombineTextInputFormat切片原理

切片生成过程分为两部分:虚拟存储过程和切片过程
假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt —>2M ;2.txt—>7M;3.txt—>0.3M;4.txt—->8.2M

  • 虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比

较,如果不大于设置的最大值,逻辑上划分一个。如果输入文件大于设置的最大值且大于
两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时
将文件均分成2个虚拟存储块(防止出现太小切片)。
比如如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的
块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文
件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
1.txt—>2M;2M<4M;一个块;
2.txt—>7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
3.txt—>0.3M;0.3<4M ,0.3M<4M ,一个块
4.txt—>8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
所有块信息:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。

  • 切片过程
    • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个

切片。

  - 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
  - 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,

则虚拟存储之后形成7个文件块,大小分别为:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M
最终会形成3个切片,大小分别为:
(2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

2 自定义InputFormat

HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,
此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的
key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为
key,文件内容为value。

结果
得到一个合并了多个小文件的SequenceFile文件

整体思路

  1. 定义一个类继承FileInputFormat
  2. 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的

RecorderReader对象

  1. 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出;
  2. Driver指定使用的InputFormat类型

image.png

代码参考

自定义InputFormat

package com.yue.mr.inputformat; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

import java.io.IOException; 

public class CustomFileInputformat extends FileInputFormat<Text, BytesWritable> { 
    //文件不可切分 
    @Override
    protected boolean isSplitable(JobContext context, Path filename) { 
        return false; 
    }
    //获取自定义RecordReader对象用来读取数据 
    @Override 
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, 
                                                                TaskAttemptContext context) 
        throws IOException, InterruptedException { 
        CustomRecordReader recordReader = new CustomRecordReader(); 
        recordReader.initialize(split, context); 
        return recordReader;
    }
}

自定义RecordReader

package com.yue.mr.inputformat; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException; 

public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
    private Configuration configuration; 
    //切片 
    private FileSplit split;
    //是否读取到内容的标识符
    private boolean isProgress = true; 
    //输出的kv
    private BytesWritable value = new BytesWritable();
    private Text k = new Text(); 
    @Override 
    public void initialize(InputSplit split, TaskAttemptContext context) 
        throws IOException, InterruptedException {
        //获取到文件切片以及配置文件对象 
        this.split = (FileSplit) split; 
        configuration = context.getConfiguration(); 
    }

    @Override public boolean nextKeyValue() throws IOException, InterruptedException { 
        if (isProgress) { 
            // 1 定义缓存区
            byte[] contents = new byte[(int) split.getLength()];
            FileSystem fs = null; 
            FSDataInputStream fis = null;
            try {
                // 2 获取文件系统 
                Path path = split.getPath(); 
                fs = path.getFileSystem(configuration);
                // 3 读取数据 
                fis = fs.open(path); 
                // 4 读取文件内容
                IOUtils.readFully(fis, contents, 0, contents.length);
                // 5 输出文件内容 
                value.set(contents, 0, contents.length); 
                // 6 获取文件路径及名称
                String name = split.getPath().toString(); 
                // 7 设置输出的key值 
                k.set(name); 
            } catch (Exception e) {
            } finally { 
                IOUtils.closeStream(fis);
            }
            isProgress = false;
            return true;
        }
        return false;
    }

    @Override 
    public Text getCurrentKey() throws IOException, InterruptedException { 
        //返回key 
        return k; 
    }

    @Override 
    public BytesWritable getCurrentValue() throws IOException, InterruptedException { 
        //返回value return value;
    }

    @Override 
    public float getProgress() throws IOException, InterruptedException { 
        return 0;
    }

    @Override public void close() throws IOException {
    } 
}

Mapper

package com.yue.mr.inputformat; 

import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException;

public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) 
        throws IOException, InterruptedException { 
        //读取内容直接输出 
        context.write(key, value);
    }
}

Reducer

package com.yue.mr.inputformat; 

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException;

public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) 
        throws IOException, InterruptedException { 
        //输出value值,其中只有一个BytesWritable 所以直接next取出即可 
        context.write(key, values.iterator().next());
    } 
}

Driver

package com.yue.mr.inputformat; 

import com.yue.mr.partition.*; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 

import java.io.IOException; 

public class SequenceFileDriver { 
    public static void main(String[] args)
        throws IOException, ClassNotFoundException, InterruptedException {
        final Configuration conf = new Configuration();
        final Job job = Job.getInstance(conf);

        job.setJarByClass(SequenceFileDriver.class); 

        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class); 

        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(CustomFileInputformat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 

        final boolean flag = job.waitForCompletion(true); 
        System.exit(flag ? 0 : 1);
    } 
}

3 OutputFormat

OutputFormat:是MapReduce输出数据的基类,所有MapReduce的数据输出都实现了OutputFormat
抽象类。下面我们介绍几种常见的OutputFormat子类

  • TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,
因为TextOutputFormat调用toString()方 法把它们转换为字符串。

  • SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这是一种好的输出格式,
因为它的格式紧凑,很容易被压缩。

自定义**OutputFormat

需求分析
要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类输出需求可以通过自定义
OutputFormat来实现。

实现步骤
1. 自定义一个类继承FileOutputFormat。
2. 改写RecordWriter,改写输出数据的方法write()。

需求
网络请求日志数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.lagou.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

输出结果
lagou.log

http://www.lagou.com

other.log

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com

参考代码
Mapper

package com.yue.mr.outputformat; 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OutputMapper extends Mapper<LongWritable, Text,Text, NullWritable> { 
    @Override
    protected void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException { 
        context.write(value, NullWritable.get());
    } 
}

Reducer

package com.lagou.mr.outputformat; 

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
public class OutputReducer extends Reducer<Text, NullWritable,Text,NullWritable> { 
    @Override 
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) 
        throws IOException, InterruptedException { 
        context.write(key,NullWritable.get() );
    }
}

OutputFormat

package com.yue.mr.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> { 
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context)
        throws IOException, InterruptedException {
        //获取文件系统对象 
        final FileSystem fs = FileSystem.get(context.getConfiguration());
        //指定输出数据的文件 
        final Path lagouPath = new Path("e:/lagou.log");
        final Path otherLog = new Path("e:/other.log");
        //获取输出流
        final FSDataOutputStream lagouOut = fs.create(lagouPath);
        final FSDataOutputStream otherOut = fs.create(otherLog);
        return new CustomWriter(lagouOut, otherOut);
    }
}

RecordWriter

package com.yue.mr.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class CustomWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream lagouOut;
    private FSDataOutputStream otherOut; 
    public CustomWriter(FSDataOutputStream lagouOut,FSDataOutputStream otherOut) {
        this.lagouOut=lagouOut; this.otherOut=otherOut; 
    }
    @Override public void write(Text key, NullWritable value)
        throws IOException, InterruptedException { 
        // 判断是否包含“lagou”输出到不同文件 
        if (key.toString().contains("lagou")) { 
            lagouOut.write(key.toString().getBytes());
            lagouOut.write("\r\n".getBytes());
        } else {
            otherOut.write(key.toString().getBytes()); 
            otherOut.write("\r\n".getBytes()); } 
    }

    @Override 
    public void close(TaskAttemptContext context) 
        throws IOException, InterruptedException { 
        IOUtils.closeStream(lagouOut); 
        IOUtils.closeStream(otherOut); 
    } 
}

Driver

package com.yue.mr.outputformat; 

import com.lagou.mr.partition.*; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OutputDriver { 
    public static void main(String[] args) 
        throws IOException, ClassNotFoundException, InterruptedException { 
        final Configuration conf = new Configuration(); 
        final Job job = Job.getInstance(conf); 

        job.setJarByClass(OutputDriver.class); 

        job.setMapperClass(OutputMapper.class); 
        job.setReducerClass(OutputReducer.class); 

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(CustomOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("e:/click_log.data"));
        // outputformat继承自fileoutputformat 而fileoutputformat要输出一个_SUCCESS 文件,所以,在这还得指定一个输出目录 
        FileOutputFormat.setOutputPath(job, new Path("e:/click_log/output"));
        final boolean flag = job.waitForCompletion(true); 
        System.exit(flag ? 0 : 1); 
    }
}

验证结果是否已把数据分别输出到不同的目录中!!

8. shuffle阶段数据的压缩机制

1 hadoop当中支持的压缩算法

数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!!

我们可以使用bin/hadoop checknative 来查看我们编译之后的hadoop支持的各种压缩,如果出现
openssl为false,那么就在线安装一下依赖包!!
image.png
安装openssl

yum install -y openssl-devel
压缩格式 hadoop自带 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序
是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

image.png

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

常见压缩方式对比分析

压缩算法 原始文件大小 压缩后的文件大小 压缩速度 解压缩速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s
LZO 8.3GB 2.9GB 49.3MB/S 74.6MB/s

2 压缩位置

  • Map输入端压缩

此处使用压缩文件作为Map的输入数据,无需显示指定编解码方式,Hadoop会自动检查文件扩展
名,如果压缩方式能够匹配,Hadoop就会选择合适的编解码方式对文件进行压缩和解压。

  • Map输出端压缩

Shuffle是Hadoop MR过程中资源消耗最多的阶段,如果有数据量过大造成网络传输速度缓慢,可
以考虑使用压缩

  • Reduce端输出压缩

输出的结果数据使用压缩能够减少存储的数据量,降低所需磁盘的空间,并且作为第二个MR的输
入时可以复用压缩。

3 压缩配置方式

  1. 在驱动代码中通过Configuration直接设置使用的压缩方式,可以开启Map输出和Reduce输出压缩

    设置map阶段压缩
    Configuration configuration = new Configuration(); 
    configuration.set("mapreduce.map.output.compress","true");
    configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); 
    设置reduce阶段的压缩
    configuration.set("mapreduce.output.fileoutputformat.compress","true"); 
    configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD" );
    configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.ap ache.hadoop.io.compress.SnappyCodec");
    
  2. 配置mapred-site.xml(修改后分发到集群其它节点,重启Hadoop集群),此种方式对运行在集群的

所有MR任务都会执行压缩。

<property>
  <name>mapreduce.output.fileoutputformat.compress</name> 
  <value>true</value> 
</property>
<property>
  <name>mapreduce.output.fileoutputformat.compress.type</name> 
  <value>RECORD</value>
</property>
<property> 
  <name>mapreduce.output.fileoutputformat.compress.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

4 压缩案例

需求
使用snappy压缩方式压缩WordCount案例的输出结果数据
具体实现
在驱动代码中添加压缩配置

configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

重新打成jar包,提交集群运行,验证输出结果是否已进行了snappy压缩!!
image.png