Hadoop序列化
序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。
反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。
Hadoop序列化的特点:
- 紧凑:存储空间少
- 快速:传输速度快
- 互操作性:支持多语言的交互
实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean。
自定义需要序列化的类:
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造
- 实现序列化方法write():
- 实现反序列化方法readFields():
- 注意反序列化顺序要和序列化的顺序完全一致(先进先出)
- 如果想把结果显示在文件中,还需要重写toString()方法
- 如果要把自定义的类的对象放在key中传输,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求key必须能够排序
示例:
package com.study.mapreduce.writable;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class MyWritable implements Writable,Comparable<MyWritable> {private Integer id;private Long scale;private Integer age;/*** 需要有无参构造器*/public MyWritable() {}/*** 序列化* @param out* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(id);out.writeLong(scale);out.writeInt(age);}/*** 反序列化* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {// 读取顺序要和write方法写的顺序一致,即先进先出id = in.readInt();scale = in.readLong();age = in.readInt();}/*** 如果想当做key在MapReduce中传输,需要实现Comparable,因为Shuffle过程要求key必须能排序* @param o* @return*/@Overridepublic int compareTo(MyWritable o) {return this.id > o.getId() ? -1 : 1;}/*** 为方便查看,还可以重写toString()方法* @return*/@Overridepublic String toString() {MessageFormat mf = new MessageFormat("MyWritable:{id:[0], scale:[1], age:[2]}");return mf.format(new Object[]{id, scale, age});}// 生成getter/setterpublic Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Long getScale() {return scale;}public void setScale(Long scale) {this.scale = scale;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}}
序列化案例
需求:统计每个手机号消耗的总上行流量、总下行流量、总流量
数据格式
1 13892386621 127.0.0.1 200 500 2002 11234426621 127.0.0.1 231 322 2003 11234426621 127.0.0.1 300 400 2004 13892386621 127.0.0.1 300 600 200id 手机号 网络ip 上行流量 下行流量 网络状态码
期望输出格式
13892386621 500 1100 1600手机号 总上行流量 总下行流量 总流量
- 实例化对象 ```java package com.hadoop.demo.flow; import com.hadoop.demo.writable.MyWritable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable, Comparable
public FlowBean() {}@Overridepublic int compareTo(MyWritable o) {return 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}@Overridepublic String toString() {return upFlow + " " + downFlow + " " + sumFlow;}
}
2. Mapper```javapackage com.hadoop.demo.flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text outK = new Text();private FlowBean outV = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {// 获取每一行数据String line = value.toString();String[] split = line.split(" ");// 抓取想要的数据// 手机号String phone = split[1];// 上行流量String up = split[split.length - 3];// 下行流量String down = split[split.length - 2];// 封装outK.set(phone);outV.setUpFlow(Long.parseLong(up));outV.setDownFlow(Long.parseLong(down));outV.setSumFlow();// 写出context.write(outK, outV);}}
- Reducer ```java package com.hadoop.demo.flow;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer
private FlowBean flowBean = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {// 遍历集合累计求和long totalUp = 0;long totalDown = 0;for (FlowBean flowBean : values) {totalUp += flowBean.getUpFlow();totalDown += flowBean.getDownFlow();}// 封装flowBean.setUpFlow(totalUp);flowBean.setDownFlow(totalDown);flowBean.setSumFlow();// 写出context.write(key, flowBean);}
}
4. Driver```javapackage com.hadoop.demo.flow;import com.hadoop.demo.wordcount.WordCountDriver;import com.hadoop.demo.wordcount.WordCountMapper;import com.hadoop.demo.wordcount.WordCountReducer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1. 获取jobConfiguration config = new Configuration();Job job = Job.getInstance(config);// 2.设置 jar 路径job.setJarByClass(FlowDriver.class);// 可以直接通过当前类的全类名反射获取到jar包路径// 3.关联mapper和reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4.设置map输出的key、value类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6.设置输入路径和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\Hadoop\\input2")); // 本地文件路径,可以输入多个FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)// FileInputFormat.setInputPaths(job, new Path(args[0]));// FileOutputFormat.setOutputPath(job, new Path(args[1]));// 可以调用 job.submit() 提交作业// 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息// waitForCompletion内部也是调用了 job.submit()boolean success = job.waitForCompletion(true);System.exit(success ? 0 : 1); // 程序退出}}
