Hadoop序列化
序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。
反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。
Hadoop序列化的特点:
- 紧凑:存储空间少
- 快速:传输速度快
- 互操作性:支持多语言的交互
实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean。
自定义需要序列化的类:
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造
- 实现序列化方法write():
- 实现反序列化方法readFields():
- 注意反序列化顺序要和序列化的顺序完全一致(先进先出)
- 如果想把结果显示在文件中,还需要重写toString()方法
- 如果要把自定义的类的对象放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求key必须能够排序
示例:
package com.study.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MyWritable implements Writable,Comparable<MyWritable> {
private Integer id;
private Long scale;
private Integer age;
/**
* 需要有无参构造器
*/
public MyWritable() {
}
/**
* 序列化
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeLong(scale);
out.writeInt(age);
}
/**
* 反序列化
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
// 读取顺序要和write方法写的顺序一致,即先进先出
id = in.readInt();
scale = in.readLong();
age = in.readInt();
}
/**
* 如果想当做key在MapReduce中传输,需要实现Comparable,因为Shuffle过程要求key必须能排序
* @param o
* @return
*/
@Override
public int compareTo(MyWritable o) {
return this.id > o.getId() ? -1 : 1;
}
/**
* 为方便查看,还可以重写toString()方法
* @return
*/
@Override
public String toString() {
MessageFormat mf = new MessageFormat("MyWritable:{id:[0], scale:[1], age:[2]}");
return mf.format(new Object[]{id, scale, age});
}
// 生成getter/setter
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getScale() {
return scale;
}
public void setScale(Long scale) {
this.scale = scale;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
序列化案例
需求:统计每个手机号消耗的总上行流量、总下行流量、总流量
数据格式
1 13892386621 127.0.0.1 200 500 200
2 11234426621 127.0.0.1 231 322 200
3 11234426621 127.0.0.1 300 400 200
4 13892386621 127.0.0.1 300 600 200
id 手机号 网络ip 上行流量 下行流量 网络状态码
期望输出格式
13892386621 500 1100 1600
手机号 总上行流量 总下行流量 总流量
- 实例化对象 ```java package com.hadoop.demo.flow; import com.hadoop.demo.writable.MyWritable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable, Comparable
public FlowBean() {
}
@Override
public int compareTo(MyWritable o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public String toString() {
return upFlow + " " + downFlow + " " + sumFlow;
}
}
2. Mapper
```java
package com.hadoop.demo.flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
// 获取每一行数据
String line = value.toString();
String[] split = line.split(" ");
// 抓取想要的数据
// 手机号
String phone = split[1];
// 上行流量
String up = split[split.length - 3];
// 下行流量
String down = split[split.length - 2];
// 封装
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
// 写出
context.write(outK, outV);
}
}
- Reducer ```java package com.hadoop.demo.flow;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer
private FlowBean flowBean = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
// 遍历集合累计求和
long totalUp = 0;
long totalDown = 0;
for (FlowBean flowBean : values) {
totalUp += flowBean.getUpFlow();
totalDown += flowBean.getDownFlow();
}
// 封装
flowBean.setUpFlow(totalUp);
flowBean.setDownFlow(totalDown);
flowBean.setSumFlow();
// 写出
context.write(key, flowBean);
}
}
4. Driver
```java
package com.hadoop.demo.flow;
import com.hadoop.demo.wordcount.WordCountDriver;
import com.hadoop.demo.wordcount.WordCountMapper;
import com.hadoop.demo.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取job
Configuration config = new Configuration();
Job job = Job.getInstance(config);
// 2.设置 jar 路径
job.setJarByClass(FlowDriver.class);// 可以直接通过当前类的全类名反射获取到jar包路径
// 3.关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置map输出的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\Hadoop\\input2")); // 本地文件路径,可以输入多个
FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 可以调用 job.submit() 提交作业
// 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
// waitForCompletion内部也是调用了 job.submit()
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}