Hadoop序列化

序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。

反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。

Hadoop序列化的特点:

  • 紧凑:存储空间少
  • 快速:传输速度快
  • 互操作性:支持多语言的交互

实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean

自定义需要序列化的类:

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造
  3. 实现序列化方法write():
  4. 实现反序列化方法readFields():
  5. 注意反序列化顺序要和序列化的顺序完全一致(先进先出)
  6. 如果想把结果显示在文件中,还需要重写toString()方法
  7. 如果要把自定义的类的对象放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求key必须能够排序

示例:

  1. package com.study.mapreduce.writable;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class MyWritable implements Writable,Comparable<MyWritable> {
  7. private Integer id;
  8. private Long scale;
  9. private Integer age;
  10. /**
  11. * 需要有无参构造器
  12. */
  13. public MyWritable() {
  14. }
  15. /**
  16. * 序列化
  17. * @param out
  18. * @throws IOException
  19. */
  20. @Override
  21. public void write(DataOutput out) throws IOException {
  22. out.writeInt(id);
  23. out.writeLong(scale);
  24. out.writeInt(age);
  25. }
  26. /**
  27. * 反序列化
  28. * @param in
  29. * @throws IOException
  30. */
  31. @Override
  32. public void readFields(DataInput in) throws IOException {
  33. // 读取顺序要和write方法写的顺序一致,即先进先出
  34. id = in.readInt();
  35. scale = in.readLong();
  36. age = in.readInt();
  37. }
  38. /**
  39. * 如果想当做key在MapReduce中传输,需要实现Comparable,因为Shuffle过程要求key必须能排序
  40. * @param o
  41. * @return
  42. */
  43. @Override
  44. public int compareTo(MyWritable o) {
  45. return this.id > o.getId() ? -1 : 1;
  46. }
  47. /**
  48. * 为方便查看,还可以重写toString()方法
  49. * @return
  50. */
  51. @Override
  52. public String toString() {
  53. MessageFormat mf = new MessageFormat("MyWritable:{id:[0], scale:[1], age:[2]}");
  54. return mf.format(new Object[]{id, scale, age});
  55. }
  56. // 生成getter/setter
  57. public Integer getId() {
  58. return id;
  59. }
  60. public void setId(Integer id) {
  61. this.id = id;
  62. }
  63. public Long getScale() {
  64. return scale;
  65. }
  66. public void setScale(Long scale) {
  67. this.scale = scale;
  68. }
  69. public Integer getAge() {
  70. return age;
  71. }
  72. public void setAge(Integer age) {
  73. this.age = age;
  74. }
  75. }

序列化案例

需求:统计每个手机号消耗的总上行流量、总下行流量、总流量
数据格式

  1. 1 13892386621 127.0.0.1 200 500 200
  2. 2 11234426621 127.0.0.1 231 322 200
  3. 3 11234426621 127.0.0.1 300 400 200
  4. 4 13892386621 127.0.0.1 300 600 200
  5. id 手机号 网络ip 上行流量 下行流量 网络状态码

期望输出格式

  1. 13892386621 500 1100 1600
  2. 手机号 总上行流量 总下行流量 总流量
  1. 实例化对象 ```java package com.hadoop.demo.flow; import com.hadoop.demo.writable.MyWritable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class FlowBean implements Writable, Comparable { // 上行流量 private long upFlow; // 下行流量 private long downFlow; // 总流量 private long sumFlow;

  1. public FlowBean() {
  2. }
  3. @Override
  4. public int compareTo(MyWritable o) {
  5. return 0;
  6. }
  7. @Override
  8. public void write(DataOutput dataOutput) throws IOException {
  9. dataOutput.writeLong(upFlow);
  10. dataOutput.writeLong(downFlow);
  11. dataOutput.writeLong(sumFlow);
  12. }
  13. @Override
  14. public void readFields(DataInput dataInput) throws IOException {
  15. this.upFlow = dataInput.readLong();
  16. this.downFlow = dataInput.readLong();
  17. this.sumFlow = dataInput.readLong();
  18. }
  19. public long getUpFlow() {
  20. return upFlow;
  21. }
  22. public void setUpFlow(long upFlow) {
  23. this.upFlow = upFlow;
  24. }
  25. public long getDownFlow() {
  26. return downFlow;
  27. }
  28. public void setDownFlow(long downFlow) {
  29. this.downFlow = downFlow;
  30. }
  31. public long getSumFlow() {
  32. return sumFlow;
  33. }
  34. public void setSumFlow() {
  35. this.sumFlow = this.upFlow + this.downFlow;
  36. }
  37. @Override
  38. public String toString() {
  39. return upFlow + " " + downFlow + " " + sumFlow;
  40. }

}

  1. 2. Mapper
  2. ```java
  3. package com.hadoop.demo.flow;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.IOException;
  8. public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  9. private Text outK = new Text();
  10. private FlowBean outV = new FlowBean();
  11. @Override
  12. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  13. // 获取每一行数据
  14. String line = value.toString();
  15. String[] split = line.split(" ");
  16. // 抓取想要的数据
  17. // 手机号
  18. String phone = split[1];
  19. // 上行流量
  20. String up = split[split.length - 3];
  21. // 下行流量
  22. String down = split[split.length - 2];
  23. // 封装
  24. outK.set(phone);
  25. outV.setUpFlow(Long.parseLong(up));
  26. outV.setDownFlow(Long.parseLong(down));
  27. outV.setSumFlow();
  28. // 写出
  29. context.write(outK, outV);
  30. }
  31. }
  1. Reducer ```java package com.hadoop.demo.flow;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

  1. private FlowBean flowBean = new FlowBean();
  2. @Override
  3. protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  4. // 遍历集合累计求和
  5. long totalUp = 0;
  6. long totalDown = 0;
  7. for (FlowBean flowBean : values) {
  8. totalUp += flowBean.getUpFlow();
  9. totalDown += flowBean.getDownFlow();
  10. }
  11. // 封装
  12. flowBean.setUpFlow(totalUp);
  13. flowBean.setDownFlow(totalDown);
  14. flowBean.setSumFlow();
  15. // 写出
  16. context.write(key, flowBean);
  17. }

}

  1. 4. Driver
  2. ```java
  3. package com.hadoop.demo.flow;
  4. import com.hadoop.demo.wordcount.WordCountDriver;
  5. import com.hadoop.demo.wordcount.WordCountMapper;
  6. import com.hadoop.demo.wordcount.WordCountReducer;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import java.io.IOException;
  15. public class FlowDriver {
  16. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  17. // 1. 获取job
  18. Configuration config = new Configuration();
  19. Job job = Job.getInstance(config);
  20. // 2.设置 jar 路径
  21. job.setJarByClass(FlowDriver.class);// 可以直接通过当前类的全类名反射获取到jar包路径
  22. // 3.关联mapper和reducer
  23. job.setMapperClass(FlowMapper.class);
  24. job.setReducerClass(FlowReducer.class);
  25. // 4.设置map输出的key、value类型
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(FlowBean.class);
  28. // 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(FlowBean.class);
  31. // 6.设置输入路径和输出路径
  32. FileInputFormat.setInputPaths(job, new Path("D:\\Hadoop\\input2")); // 本地文件路径,可以输入多个
  33. FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)
  34. // FileInputFormat.setInputPaths(job, new Path(args[0]));
  35. // FileOutputFormat.setOutputPath(job, new Path(args[1]));
  36. // 可以调用 job.submit() 提交作业
  37. // 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
  38. // waitForCompletion内部也是调用了 job.submit()
  39. boolean success = job.waitForCompletion(true);
  40. System.exit(success ? 0 : 1); // 程序退出
  41. }
  42. }