序列化就是把内存中的对象,转换称字节序列以便于存储(持久化)和网络传输
反序列化就是将收到的字节序列或者硬盘的持久化数据,转换称内存中的对象
1.1 Java中的序列化和反序列化
package com.BigData.test;
import java.io.*;
class Student implements Serializable {//简单JAVA类实现Serializable接口
private String name;
private int age;
public Student() {//无参构造
}
public Student(String name, int age) {//有参构造
this.name = name;
this.age = age;
}
public String getName() {//getter和setter方法
return name;
}
public void setName(String name) {//getter和setter方法
this.name = name;
}
public int getAge() {//getter和setter方法
return age;
}
public void setAge(int age) {//getter和setter方法
this.age = age;
}
@Override //toString方法
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
public class Test {
public static void main(String[]args) throws IOException, ClassNotFoundException {
Student stu = new Student("张三",1001); //构造对象
//构造对象输出流
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("data.txt"));
oos.writeObject(stu);//输出对象
System.out.println("对象序列化输出");
System.out.println("################################");
//构造输入流
ObjectInputStream ois = new ObjectInputStream(new FileInputStream("data.txt"));
//输入对象并接受
Student stu1 = (Student) ois.readObject();
System.out.println("对象反序列化读入");
System.out.println(stu1);
}
}
1.2 Hadoop中的序列化和反序列化
Hadoop框架自己开发了一套序列化机制(Writable)。紧凑,快速,可扩展,互操作
常用的数据类型对应的Hadoop数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
自定义bean对象要想序列化传输,必须实现序列化接口。具体操作步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() { super(); } |
---|
(3)重写序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } |
---|
(4)重写反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } |
---|
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } |
---|
1.3 案例
统计每一个手机号耗费的总上行流量、下行流量、总流量
数据
phone_data.txt
(1)输入数据格式:
7 13560436666 120.196.100.99 1116 954 200 id 手机号码 网络ip 上行流量 下行流量 网络状态码 |
---|
(2)期望输出数据格式
13560436666 1116 954 2070 手机号码 上行流量 下行流量 总流量 |
---|
实现代码
package com.BigData.MapReduce;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean(){}
public FlowBean(long upFlow, long downFlow, long sumFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override//反序列化方法读顺序必须和写序列化方法的写顺序必须一致
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
}
package com.BigData.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapReduceDemo {
public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws java.io.IOException ,InterruptedException
{
String line = value.toString();
String[] data = line.split("\t");
FlowBean flow = new FlowBean();
flow.setUpFlow(Long.parseLong(data[data.length-3]));
flow.setDownFlow(Long.parseLong(data[data.length-2]));
flow.setSumFlow(Long.parseLong(data[data.length-3])+Long.parseLong(data[data.length-2]));
context.write(new Text(data[1]),flow);
}
}
//=======分割线=========
//shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
//第二部分,写Reduce阶段
// public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
// //同样是有reduce函数
// @Override
// protected void reduce(Text k2, Iterable<LongWritable> v2s,
// Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//
// }
// }
public static void main(String[] args) throws Exception{
//设置配置参数
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
if(fs.exists(new Path("/out")))
fs.delete(new Path("/out"),true);
//创建任务
conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
Path input = new Path("/data/phone_data.txt");
Path output = new Path("/out");
Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
//指定jar文件
job.setJarByClass(MapReduceDemo.class);
//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
FileInputFormat.addInputPath(job,input);
//指定map的类
job.setMapperClass(MyMapper.class);
//指定map输出的key和value的数据类型。
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setNumReduceTasks(0);
//指定reduce类以及输出数据类型。
//job.setReducerClass(MyReduce.class);
//job.setOutputKeyClass(Text.class);
//job.setOutputValueClass(LongWritable.class);
//指定输出路径hdfs
FileOutputFormat.setOutputPath(job,output);
//提交任务,如果是true,会返回任务执行的进度信息等。
job.waitForCompletion(true);
}
}
运行结果:
调整运行结果的格式,修改FlowBean的toString()方法即可