一、Hadoop 序列化

为什么进行序列化?

  • 序列化 主要是我们通过 网络通信传输数据 时或者 把对象持久化到文件,需要把对象序列化成 二进制的结构
  • 观察源码时发现自定义 Mapper类 与自定义 Reducer类 都有 泛型类型约束,比如自定义 Mapper 有四个形参类型,但是形参类型并不是常见的 java 基本类型

    • 那是 Hadoop 自己的序列化格式 Writable,而不是 java 的 Serializable

      Hadoop 序列化格式

  • 序列化在 分布式程序 中非常重要,在 Hadoop 中,集群中多个节点的进程间的通信是通过 RPCRemote Procedure Call,远程过程调用)实现;RPC将消息序列化成二进制流发送到远程节点,远程节点再将接收到的二进制数据反序列化为原始的消息,因此 RPC 往往追求如下特点(也是与 java序列号的区别)

    • 紧凑:数据更紧凑,能充分利用网络带宽资源
    • 快速:序列化和反序列化的性能开销更低
  • 一个对象使用 Serializable 序列化后,会携带很多额外信息比如校验信息、Header、继承体系等。Hadoop 使用的是自己的序列化格式 Writable,它比 java 的序列化 Serialization 更紧凑、速度更快

    Java基本类型与Hadoop序列化类型

    image.png

    二、Writable 序列化接口

  • 基本序列化类型 往往不能满足所有需求,比如在 Hadoop 框架内部传递一个自定义 bean 对象,那么该对象就需要实现 Writable 序列化接口

    • Hadoop 中提供了 基本数据类型的 序列化,但是自定义类的序列化需要用户自行实现

      Writable 序列化实现步骤

  1. 必须实现 Writable 接口

  2. 反序列化时,需要反射调用空参构造函数,所以 必须有空参构造

    1. public CustomBean() {
    2. super();
    3. }
  3. 重写序列化方法

    1. @Override
    2. public void write(DataOutput dataOutput) throws IOException {
    3. ....
    4. }
  4. 重写反序列化方法

    1. @Override
    2. public void readFields(DataInput dataInput) throws IOException {
    3. ....
    4. }
  5. 反序列化的字段顺序和序列化字段的 顺序必须完全一致

  6. 为了方便展示结果数据,需要重写 bean对象的 toString() 方法,可以自定义分隔符

  7. 如果自定义 Bean 对象需要放在 Mapper 输出 KV 中的 K,则该对象还需实现 Comparable 接口,因为 MapReduce 框架中的 Shuffle 过程要求对 key 必须能排序!!

    • 排序内容后续有专门案例讲解!!
      1. @Override
      2. public int compareTo(CustomBean o) {
      3. // 自定义排序规则
      4. return this.num > o.getNum() ? -1 : 1;
      5. }

      Writable 接口案例

  • 需求
    • 统计每台智能音箱设备内容播放时长
    • 原始日志格式如下
      • image.png
    • 要求结果输出格式
      • image.png

        编写 MapReduce 程序

        ① 创建 SpeakBean 对象

        ```java package com.lagou.speak;

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

// 1 实现 writable 接⼝ public class SpeakBean implements Writable {

  1. private Long selfDuration;
  2. private Long thirdPartDuration;
  3. private String deviceId;
  4. private Long sumDuration;
  5. // 2 反序列化时,需要反射调用空参构造函数,所以必须有
  6. public SpeakBean() {
  7. }
  8. public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) {
  9. this.selfDuration = selfDuration;
  10. this.thirdPartDuration = thirdPartDuration;
  11. this.deviceId = deviceId;
  12. this.sumDuration = selfDuration + thirdPartDuration;
  13. }
  14. //3 序列化方法
  15. @Override
  16. public void write(DataOutput dataOutput) throws IOException {
  17. dataOutput.writeLong(selfDuration);
  18. dataOutput.writeLong(thirdPartDuration);
  19. dataOutput.writeUTF(deviceId);
  20. dataOutput.writeLong(sumDuration);
  21. }
  22. //4 反序列化方法
  23. // 反序列化方法读顺序 必须和 序列化方法的写顺序 一致
  24. @Override
  25. public void readFields(DataInput dataInput) throws IOException {
  26. this.selfDuration = dataInput.readLong();
  27. this.thirdPartDuration = dataInput.readLong();
  28. this.deviceId = dataInput.readUTF();
  29. this.sumDuration = dataInput.readLong();
  30. }
  31. public Long getSelfDuration() {
  32. return selfDuration;
  33. }
  34. public void setSelfDuration(Long selfDuration) {
  35. this.selfDuration = selfDuration;
  36. }
  37. public Long getThirdPartDuration() {
  38. return thirdPartDuration;
  39. }
  40. public void setThirdPartDuration(Long thirdPartDuration) {
  41. this.thirdPartDuration = thirdPartDuration;
  42. }
  43. public String getDeviceId() {
  44. return deviceId;
  45. }
  46. public void setDeviceId(String deviceId) {
  47. this.deviceId = deviceId;
  48. }
  49. public Long getSumDuration() {
  50. return sumDuration;
  51. }
  52. public void setSumDuration(Long sumDuration) {
  53. this.sumDuration = sumDuration;
  54. }
  55. //6 编写 toString 方法,方便后续打印到文本
  56. @Override
  57. public String toString() {
  58. return selfDuration +
  59. "\t" + thirdPartDuration +
  60. "\t" + sumDuration;
  61. }

}

<a name="ZQNBm"></a>
#### ② 编写 Mapper 类
```java
package com.lagou.speak;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割字段
        String[] strings = line.split("\t");

        // 3 封装对象
        // 取出设备id
        String deviceId = strings[1];
        //取出自有时长数据和第三方时长数据
        String selfDuration = strings[strings.length - 3];
        String thirdPartDuration = strings[strings.length - 2];

        //4 封装对象
        SpeakBean bean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId);

        // 5 写出
        context.write(new Text(deviceId), bean);
    }
}

③ 编写 Reducer 类

package com.lagou.speak;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {


    @Override
    protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException {

        Long sumSelfDuration = 0L;
        Long sumThirdPartDuration = 0L;

        // 1 遍历所用 bean,将其中的自有,第三方时长分别累加
        for (SpeakBean bean : values) {
            Long selfDuration = bean.getSelfDuration();
            Long thirdPartDuration = bean.getThirdPartDuration();
            sumSelfDuration += selfDuration;
            sumThirdPartDuration += thirdPartDuration;
        }

        // 2 封装对象
        SpeakBean bean = new SpeakBean(sumSelfDuration, sumThirdPartDuration, key.toString());

        // 3 写出
        context.write(key, bean);
    }
}

④ 编写驱动类 Driver

package com.lagou.speak;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SpeakDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "d:/speak/input", "e:/speak/output" };

        // 1 获取配置信息,或者job对象实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SpeakDriver");

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(SpeakDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(SpeakMapper.class);
        job.setReducerClass(SpeakReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(SpeakBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(SpeakBean.class);

        // 6 指定job的输⼊原始文件所在目录,以及输出结果的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给 yarn去运行
        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }
}

MR 编程技巧总结

  • 结合业务设计 Map 输出的 key 和 value,利用 key相同则去往同一个reduce 的特点!!
  • map() 方法中获取到只是一行文本数据,尽量不做聚合运算
  • reduce() 方法的参数要清楚含义