6. MapReduce 中的排序

  • 排序是 MapReduce 框架中最重要的操作之一
  • MapTask 和 ReduceTask 均会对数据按照 key 进行排序。该操作属于 Hadoop 的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

    • MapTask
      • 它会将处理理的结果暂时放到 环形缓冲区 中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上
      • 溢写完毕后,它会对磁盘上所有文件进行**归并排序**
    • ReduceTask
      • 当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序
      • 1. 部分排序:MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部有序
      • 2. 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理理大型文件时效率极低,因为一台机器器处理所有文件,完全丧失了 MapReduce 所提供的并行架构的优势
      • 3. 辅助排序:( GroupingComparator 分组 ) 在 Reduce 端对 key 进行分组。应用于:在接收的 key 为 bean 对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
      • 4. 二次排序:在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序

        WritableComparable

  • Bean 对象如果作为 Map 输出的 key 时,需要实现 WritableComparable 接口并重写 compareTo 方法指定排序规则

    全排序

  • 基于统计的播放时长案例例的输出结果对总时长进行排序

  • 实现全局排序只能设置一个ReduceTask
  • 输入(要将以下的内容按降序排序)
    • image.png
  • 需求分析
    • 如何设计map() 方法输出的 key,value
    • MR 框架中 shuffle 阶段的排序是默认行为,不管你是否需要都会进行排序
    • key:把所有字段封装成为一个 bean 对象,并且指定 bean 对象作为 key 输出,如果作为 key 输出,需要实现排序接口,指定自己的排序规则

具体步骤

  • Mapper
      1. 读取结果文件,按照制表符进行切分
      1. 解析出相应字段封装为 SpeakBean
      1. SpeakBean 实现 WritableComparable 接口重写 compareTo 方法
      1. map() 方法输出 kv:key-->SpeakBeanvalue-->NullWritable.get()
  • Reducer
    • 循环遍历输出

Mapper 代码

  1. package com.lagou.sort;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. public class SortMapper extends Mapper<LongWritable, Text, SpeakBean, NullWritable> {
  8. final SpeakBean bean = new SpeakBean();
  9. @Override
  10. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  11. String[] strings = value.toString().split("\t");
  12. bean.setDeviceId(strings[0]);
  13. bean.setSelfDuration(Long.parseLong(strings[1]));
  14. bean.setThirdPartDuration(Long.parseLong(strings[2]));
  15. bean.setSumDuration(Long.parseLong(strings[4]));
  16. context.write(bean, NullWritable.get());
  17. }
  18. }

Reducer 代码

package com.lagou.sort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<SpeakBean, NullWritable, SpeakBean, NullWritable> {

    @Override
    protected void reduce(SpeakBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        for (NullWritable value : values) {
            context.write(key,value);
        }
    }
}

Bean 对象实现 WritableComparable 接口

package com.lagou.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SpeakBean implements WritableComparable<SpeakBean> {

    private Long selfDuration;
    private Long thirdPartDuration;
    private String deviceId;
    private Long sumDuration;

    public SpeakBean() {
    }

    public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId, Long sumDuration) {
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.deviceId = deviceId;
        this.sumDuration = sumDuration;
    }

    // 指定排序规则,我们希望按照总时长进行排序
    @Override
    public int compareTo(SpeakBean o) {
        // 返回值 0表示 o等于this  1表示o大于this  -1表示o小于this

        if (this.sumDuration > o.sumDuration) {
            return -1;
        } else if (this.sumDuration < o.sumDuration) {
            return 1;
        } else {
            return 0;
        }

    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeLong(sumDuration);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.deviceId = dataInput.readUTF();
        this.sumDuration = dataInput.readLong();
    }

    public Long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(Long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public Long getSumDuration() {
        return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
        this.sumDuration = sumDuration;
    }

    @Override
    public String toString() {
        return selfDuration +
                "\t" + thirdPartDuration +
                "\t" + deviceId +
                "\t" + sumDuration;
    }
}

Driver 代码

package com.lagou.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SortDriver");

        job.setJarByClass(SortDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(SpeakBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(SpeakBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\speak\\output"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\speak\\output\\outputSort"));

        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }

}

总结

  1. 自定义对象作为 Map 的 key 输出时,需要实现 WritableComparable 接口,排序:重写 compareTo() 方法序列化以及反序列化方法
  2. 再次理解 reduce() 方法的参数:reduce() 方法是 map 输出的 kv 中 key 相同的 kv 中的 value 组成一个集合调用一次 reduce() 方法,选择遍历 values 得到所有的 key
  3. 默认reduceTask 数量是1个
  4. 对于全局排序需要保证只有一个 reduceTask

分区排序

  • (默认的分区规则,区内有序)

GroupingComparator

  • GroupingComparator 是 mapreduce 当中 reduce 端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次 reduce 的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义 GroupingComparator 实现不同的 key 作为同一个组,调用一次reduce逻辑

需求

  • image.png
  • 需要求出每一个订单中成交金额最大的一笔交易

实现思路

  • Mapper
    • 读取一行文本数据,切分出每个字段
    • 订单id和金额封装为一个 Bean 对象,Bean 对象的排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排序
    • map() 方法输出kv:key-->bean对象value-->NullWritable.get()
  • Shuffle
    • 指定分区器器,保证相同订单 id 的数据去往同个分区(自定义分区器器)
    • 指定 GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组
  • Reduce
    • 每个 reduce() 方法写出一组 key 的第一个

代码实现

OrderBean

  • OrderBean定义两个字段,一个字段是orderId,第二个字段是金额(注意金额一定要使用 Double 或者 DoubleWritable 类型,否则没法按照金额顺序排序
  • 排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排序 ```java package com.lagou.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class OrderBean implements WritableComparable {

private String orderId;
private Double price;

public OrderBean() {
}

public OrderBean(String orderId, Double price) {
    this.orderId = orderId;
    this.price = price;
}

// 指定排序规则,先按照订单id比较,再按照金额大小比较(降序)
@Override
public int compareTo(OrderBean o) {
    int orderCompare = this.orderId.compareTo(o.getOrderId()); // 0 1 -1
    if (orderCompare == 0) {
        //订单id相同,比较金额,并返回金额比较结果
        return -(this.price.compareTo(o.getPrice()));
    }
    //订单不同,直接返回订单比较结果
    return orderCompare;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeUTF(orderId);
    dataOutput.writeDouble(price);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    this.orderId = dataInput.readUTF();
    this.price = dataInput.readDouble();
}

public String getOrderId() {
    return orderId;
}

public void setOrderId(String orderId) {
    this.orderId = orderId;
}

public Double getPrice() {
    return price;
}

public void setPrice(Double price) {
    this.price = price;
}

@Override
public String toString() {
    return orderId + '\t' + price;
}

}

**自定义分区器 CustomPartitioner**

- 保证ID相同的订单去往同个分区最终去往同一个 Reduce 中
```java
package com.lagou.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {

    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {

        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

自定义 GroupingComparator

  • 保证 id 相同的订单进入一个分组中,进入分组的数据已经是按照金额降序排序
  • reduce() 方法取出第一个即是金额最高的交易 ```java package com.lagou.group;

import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {

//注册自定义的GroupingComparator接受OrderBean对象
public CustomGroupingComparator() {
    super(OrderBean.class, true);
}

// 重写其中的compare方法,通过该方法来让 mr 接受 orderId相等则两个对象(key)相等 的规则
@Override
public int compare(WritableComparable a, WritableComparable b) {
    // 比较两个对象的 orderId
    OrderBean o1 = (OrderBean) a;
    OrderBean o2 = (OrderBean) b;
    return o1.getOrderId().compareTo(o2.getOrderId()); // 0 1 -1
}

}

**Mapper**
```java
package com.lagou.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split("\t");

        bean.setOrderId(strings[0]);
        bean.setPrice(Double.parseDouble(strings[2]));

        context.write(bean, NullWritable.get());
    }
}

Reducer

package com.lagou.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    // key: reduce方法的key是一组相同的key的kv的第一个key作为传入reduce方法的key
    //      因为已经指定了降序的规则,因此第一个key(传入的参数)就是金额最大的交易数据
    // value: 一组相同的key的kv对中 v的集合
    // 对于如何判断key是否相同,自定义对象是需要我们指定一个规则(在GroupingComparator中指定)
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //直接输出key,即为最大金额的数据
        context.write(key, NullWritable.get());
    }
}

Driver

package com.lagou.group;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "GroupDriver");

        job.setJarByClass(GroupDriver.class);
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 指定分区器
        job.setPartitionerClass(CustomPartitioner.class);

        // 指定使用 groupingComparator
        job.setGroupingComparatorClass(CustomGroupingComparator.class);

        // 指定reduceTask 数量
        job.setNumReduceTasks(3);

        FileInputFormat.setInputPaths(job, new Path("D:\\大数据正式班\\阶段一\\资料\\data\\data\\GroupingComparator"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\group\\output"));

        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);

    }
}

思考题

  • 如果要求第一和第二多的金额,怎么求呢?
  • 答:可以遍历 values,得到每个value对应的key
    • 原因:当遍历每个value时,会自动将key更新为该value对应的key,源码如下
    • image.png
    • image.png

7. MapReduce 读取和输出数据

InputFormat

  • 运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?
  • InputFormat 是 MapReduce 框架用来读取数据的类
  • InputFormat 常见子类包括

    • TextInputFormat (普通文本文件,MR框架默认的读取实现类型
    • KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
    • NLineInputFormat (读取数据按照行数进行划分片)
    • CombineTextInputFormat (合并小文件,避免启动过多 MapTask 任务)
    • 自定义InputFormat

      CombineTextInputFormat 案例

  • MR框架默认的 TextInputFormat切片机制按文件划分切片,文件无论多小,都是单独一个切片, 然后由一个 MapTask 处理,如果有大量小文件,就对应的会生成并启动大量的MapTask,而每个 MapTask 处理的数据量很小,大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用率不高

  • CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率

需求

  • 将输入数据中的多个小文件合并为一个切片处理理
  • 运行 WordCount 案例,准备多个小文件

具体使用方式

// 如果不设置InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

// 虚拟存储切⽚最⼤值设置 4M
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
  • 注:记得验证切片数量的变化!!

  • CombineTextInputFormat 切片原理

    • 切片生成过程分为两部分:虚拟存储过程切片过程
    • 假设设置 setMaxInputSplitSize 值为 4M
    • 四个小文件:1.txt —>2M ;2.txt—>7M;3.txt—>0.3M;4.txt—->8.2M
    • 虚拟存储过程
      • 把输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值进行行比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值的2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。比如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02 M,则先逻辑上分出一个4M的块。剩余的大小为4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的非常小的虚拟存储文件,所以将剩余的4.02M 文件切分成(2.01M和2.01M)两个文件。
      • 1.txt—>2M;2M<4M;一个块;
      • 2.txt—>7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
      • 3.txt—>0.3M;0.3<4M ,0.3M<4M ,一个块
      • 4.txt—>8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
      • 所有块信息:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块
    • 切片过程
      • 判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于(实际上只能等于,不可能大于)则单独形成一个切片
      • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片
      • 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M 这四个小文件,则虚拟存储之后形成7个文件块,大小分别为: 2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M
      • 最终会形成3个切片,大小分别为: (2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
    • 注:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值

自定义 InputFormat

  • 需求分析
    • 无论 HDFS 还是 MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义 InputFormat 实现小文件的合并
    • image.png
  • 实现步骤

      1. 自定义一个类继承 FileOutputFormat
      1. 改写 RecordWriter,改写输出数据的方法 write()


        参考代码

        ① Mapper** ```java package com.lagou.mr.output;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OutputMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } }

**② Reducer**
```java
package com.lagou.mr.output;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

public class OutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

③ OutputFormat

package com.lagou.mr.output;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {
    //写出数据的对象
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        //定义写出数据的路径信息,并获取到输出流传入writer对象中
        final Configuration conf = context.getConfiguration();
        final FileSystem fs = FileSystem.get(conf);
        //定义输出的路径
        final FSDataOutputStream lagouOut = fs.create(new Path("e:/lagou.log"));
        final FSDataOutputStream otherOut = fs.create(new Path("e:/other.log"));
        CustomWriter customWriter = new CustomWriter(lagouOut, otherOut);
        return customWriter;
    }
}

④ RecordWriter

package com.lagou.mr.output;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;


public class CustomWriter extends RecordWriter<Text, NullWritable> {
    //定义成员变量
    private FSDataOutputStream lagouOut;
    private FSDataOutputStream otherOut;

    //定义构造方法接收两个输出流


    public CustomWriter(FSDataOutputStream lagouOut, FSDataOutputStream otherOut) {
        this.lagouOut = lagouOut;
        this.otherOut = otherOut;
    }

    //写出数据的逻辑,控制写出的路径
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        //写出数据需要输出流
        final String line = key.toString();
        if (line.contains("lagou")) {
            lagouOut.write(line.getBytes());
            lagouOut.write("\r\n".getBytes());
        } else {
            otherOut.write(line.getBytes());
            otherOut.write("\r\n".getBytes());
        }
    }

    //关闭,释放资源
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

        IOUtils.closeStream(lagouOut);
        IOUtils.closeStream(otherOut);
    }
}

⑤ Driver

package com.lagou.mr.output;

import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
         */
//        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();

        final Job job = Job.getInstance(conf, "OutputDriver");
//        2. 指定程序jar的本地路径
        job.setJarByClass(OutputDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(OutputMapper.class);
        job.setReducerClass(OutputReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //指定使用自定义outputformat
        job.setOutputFormatClass(CustomOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("E:\\teach\\hadoop框架\\资料\\data\\click_log")); //指定读取数据的原始路径
//        7. 指定job输出结果路径,因为mr默认要输出一个success等标识文件
        FileOutputFormat.setOutputPath(job, new Path("E:\\clicklog\\out")); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}

8. shuffle 阶段数据的压缩机制

hadoop当中⽀⽀持的压缩算法

  • 数据压缩有两大好处,节约**磁盘**空间加速数据在网络和磁盘上的传输!!
  • 我们可以使用 bin/hadoop checknative 来查看我们编译之后的 hadoop 支持的各种压缩,如果出现 openssl 为 false,那么就在线安装一下依赖包

    • yum install -y openssl-devel

      image.png
      image.png

  • 为了支持多种 压缩/解压缩算法,Hadoop 引入了 编码/解码器

    • image.png
  • 常见压缩方式对比分析

    • image.png

      压缩位置

  • Map 输入端压缩

    • 此处使用压缩文件作为 Map 的输入数据,无需显示指定编解码方式,Hadoop会自动检查文件扩展名,如果压缩方式能够匹配,Hadoop 就会选择合适的编解码方式对文件进行压缩和解压
  • Map 输出端压缩
    • Shuffle 是 Hadoop MR 过程中资源消耗最多的阶段,如果有数据量过大造成网络传输速度缓慢,可以考虑使用压缩
  • Reduce 端输出压缩

    • 输出的结果数据使用压缩能够减少存储的数据量,降低所需磁盘的空间,并且作为第二个MR的输入时可以复用压缩

      压缩配置方式

    1. 在驱动代码中通过 Configuration 直接设置使用的压缩方式,可以开启Map输出和Reduce输出压缩 ```java

      设置map阶段压缩

      Configuration configuration = new Configuration(); configuration.set(“mapreduce.map.output.compress”,”true”); configuration.set(“mapreduce.map.output.compress.codec”,”org.apache.hadoop.io.compress.SnappyCodec”);

设置reduce阶段的压缩

configuration.set(“mapreduce.output.fileoutputformat.compress”,”true”); configuration.set(“mapreduce.output.fileoutputformat.compress.type”,”RECORD”); configuration.set(“mapreduce.output.fileoutputformat.compress.codec”,”org.apache.hadoop.io.compress.SnappyCodec”);


- 2. 配置 **`mapred-site.xml`** (**修改后分发到集群其它节点,重启Hadoop集群**),此种方式对运行在集群的 **所有MR任务 **都会执行压缩
```xml
<property>
        <name>mapreduce.output.fileoutputformat.compress</name>
        <value>true</value>
</property>
<property>
        <name>mapreduce.output.fileoutputformat.compress.type</name>
        <value>RECORD</value>
</property>
<property>
        <name>mapreduce.output.fileoutputformat.compress.codec</name>
        <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

压缩案例

  • 需求
    • 使用 snappy 压缩方式压缩 WordCount 案例的输出结果数据
  • 具体实现
    • 在驱动代码中添加压缩配置
      configuration.set("mapreduce.output.fileoutputformat.compress","true");
      configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
      configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
      
      image.png