序列化就是把内存中的对象,转换称字节序列以便于存储(持久化)和网络传输
反序列化就是将收到的字节序列或者硬盘的持久化数据,转换称内存中的对象

1.1 Java中的序列化和反序列化

  1. package com.BigData.test;
  2. import java.io.*;
  3. class Student implements Serializable {//简单JAVA类实现Serializable接口
  4. private String name;
  5. private int age;
  6. public Student() {//无参构造
  7. }
  8. public Student(String name, int age) {//有参构造
  9. this.name = name;
  10. this.age = age;
  11. }
  12. public String getName() {//getter和setter方法
  13. return name;
  14. }
  15. public void setName(String name) {//getter和setter方法
  16. this.name = name;
  17. }
  18. public int getAge() {//getter和setter方法
  19. return age;
  20. }
  21. public void setAge(int age) {//getter和setter方法
  22. this.age = age;
  23. }
  24. @Override //toString方法
  25. public String toString() {
  26. return "Student{" +
  27. "name='" + name + '\'' +
  28. ", age=" + age +
  29. '}';
  30. }
  31. }
  32. public class Test {
  33. public static void main(String[]args) throws IOException, ClassNotFoundException {
  34. Student stu = new Student("张三",1001); //构造对象
  35. //构造对象输出流
  36. ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("data.txt"));
  37. oos.writeObject(stu);//输出对象
  38. System.out.println("对象序列化输出");
  39. System.out.println("################################");
  40. //构造输入流
  41. ObjectInputStream ois = new ObjectInputStream(new FileInputStream("data.txt"));
  42. //输入对象并接受
  43. Student stu1 = (Student) ois.readObject();
  44. System.out.println("对象反序列化读入");
  45. System.out.println(stu1);
  46. }
  47. }

1.2 Hadoop中的序列化和反序列化

Hadoop框架自己开发了一套序列化机制(Writable)。紧凑,快速,可扩展,互操作
常用的数据类型对应的Hadoop数据序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

自定义bean对象要想序列化传输,必须实现序列化接口。具体操作步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

1.3 案例

统计每一个手机号耗费的总上行流量、下行流量、总流量
数据
phone_data.txt
(1)输入数据格式:

7 13560436666 120.196.100.99 1116 954 200
id 手机号码 网络ip 上行流量 下行流量 网络状态码

(2)期望输出数据格式

13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量

实现代码

  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class FlowBean implements Writable {
  7. private long upFlow;
  8. private long downFlow;
  9. private long sumFlow;
  10. public FlowBean(){}
  11. public FlowBean(long upFlow, long downFlow, long sumFlow) {
  12. this.upFlow = upFlow;
  13. this.downFlow = downFlow;
  14. this.sumFlow = sumFlow;
  15. }
  16. public long getUpFlow() {
  17. return upFlow;
  18. }
  19. public void setUpFlow(long upFlow) {
  20. this.upFlow = upFlow;
  21. }
  22. public long getDownFlow() {
  23. return downFlow;
  24. }
  25. public void setDownFlow(long downFlow) {
  26. this.downFlow = downFlow;
  27. }
  28. public long getSumFlow() {
  29. return sumFlow;
  30. }
  31. public void setSumFlow(long sumFlow) {
  32. this.sumFlow = sumFlow;
  33. }
  34. @Override
  35. public String toString() {
  36. return "FlowBean{" +
  37. "upFlow=" + upFlow +
  38. ", downFlow=" + downFlow +
  39. ", sumFlow=" + sumFlow +
  40. '}';
  41. }
  42. @Override
  43. public void write(DataOutput out) throws IOException {
  44. out.writeLong(upFlow);
  45. out.writeLong(downFlow);
  46. out.writeLong(sumFlow);
  47. }
  48. @Override//反序列化方法读顺序必须和写序列化方法的写顺序必须一致
  49. public void readFields(DataInput in) throws IOException {
  50. this.upFlow = in.readLong();
  51. this.downFlow = in.readLong();
  52. this.sumFlow = in.readLong();
  53. }
  54. }
  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
  13. import java.io.IOException;
  14. import java.net.URI;
  15. import java.net.URISyntaxException;
  16. public class MapReduceDemo {
  17. public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  18. protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws java.io.IOException ,InterruptedException
  19. {
  20. String line = value.toString();
  21. String[] data = line.split("\t");
  22. FlowBean flow = new FlowBean();
  23. flow.setUpFlow(Long.parseLong(data[data.length-3]));
  24. flow.setDownFlow(Long.parseLong(data[data.length-2]));
  25. flow.setSumFlow(Long.parseLong(data[data.length-3])+Long.parseLong(data[data.length-2]));
  26. context.write(new Text(data[1]),flow);
  27. }
  28. }
  29. //=======分割线=========
  30. //shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
  31. //第二部分,写Reduce阶段
  32. // public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
  33. // //同样是有reduce函数
  34. // @Override
  35. // protected void reduce(Text k2, Iterable<LongWritable> v2s,
  36. // Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  37. //
  38. // }
  39. // }
  40. public static void main(String[] args) throws Exception{
  41. //设置配置参数
  42. Configuration conf = new Configuration();
  43. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
  44. if(fs.exists(new Path("/out")))
  45. fs.delete(new Path("/out"),true);
  46. //创建任务
  47. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
  48. Path input = new Path("/data/phone_data.txt");
  49. Path output = new Path("/out");
  50. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
  51. //指定jar文件
  52. job.setJarByClass(MapReduceDemo.class);
  53. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
  54. FileInputFormat.addInputPath(job,input);
  55. //指定map的类
  56. job.setMapperClass(MyMapper.class);
  57. //指定map输出的key和value的数据类型。
  58. job.setMapOutputKeyClass(Text.class);
  59. job.setMapOutputValueClass(LongWritable.class);
  60. job.setNumReduceTasks(0);
  61. //指定reduce类以及输出数据类型。
  62. //job.setReducerClass(MyReduce.class);
  63. //job.setOutputKeyClass(Text.class);
  64. //job.setOutputValueClass(LongWritable.class);
  65. //指定输出路径hdfs
  66. FileOutputFormat.setOutputPath(job,output);
  67. //提交任务,如果是true,会返回任务执行的进度信息等。
  68. job.waitForCompletion(true);
  69. }
  70. }

运行结果:
image.png
调整运行结果的格式,修改FlowBean的toString()方法即可