MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。默认按照字典顺序排序,且使用快速排序。
比如:
3.1 JAVA中的自定义排序
案例:对一个基本数据类型,调用Arrays.sort方法进行排序
package com.BigData.test;
import java.util.Arrays;
public class Test {
public static void main(String[]args) {
int[] data = {4,5,1,2,7,8};
for(int x:data)
System.out.println(x);
System.out.println("#######################");
Arrays.sort(data);
for(int x:data)
System.out.println(x);
}
}
案例:自定义排序
如果自定义类不支持排序,则会抛出异常
:::info
Exception in thread “main” java.lang.ClassCastException: com.BigData.test.Student cannot be cast to java.lang.Comparable
:::
让自定义类实现Comparable接口,并重写compareTo方法明确排序规则
package com.BigData.test;
import java.util.Arrays;
class Student implements Comparable {
private String name;
private int age;
public Student() {
}
public Student(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
@Override
public int compareTo(Object o) {
Student stu = (Student) o;
return stu.age-this.age;
}
}
public class Test {
public static void main(String[]args) {
Student [] stu = new Student[]{
new Student("张三",10),
new Student("李四",11),
new Student("王五",9),
new Student("赵六",15)
};
for(Student s : stu)
System.out.println(s);
System.out.println("############排序分割线");
Arrays.sort(stu);
for(Student s : stu)
System.out.println(s);
}
}
运行结果:
另比如需要按照姓名的字典序排序,则改写compareTo中的比较规则即可
public int compareTo(Object o) {
Student stu = (Student) o;
return stu.name.compareTo(this.name);
}
3.2 Hadoop中的自定义排序
bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
案例:
要求每个省份手机号输出的文件中按照总流量排序。
package com.BigData.MapReduce;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable {
private long upFlow;
private long downFlow;
private long sumFlow;
private long phone;
public FlowBean(){}
public FlowBean(long upFlow, long downFlow, long sumFlow, long phone) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
this.phone = phone;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public long getPhone() {
return phone;
}
public void setPhone(long phone) {
this.phone = phone;
}
@Override
public String toString() {
return this.phone+"\t"+this.upFlow+"\t"+this.downFlow+"\t"+this.sumFlow;
}
@Override
public int compareTo(Object o) {
FlowBean fb = (FlowBean) o;
return (int)(fb.sumFlow - this.sumFlow);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
out.writeLong(phone);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
this.phone=in.readLong();
}
}
MapReduce主程序
package com.BigData.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MapReduceDemo {
public static class MyMapper extends Mapper<LongWritable, Text, FlowBean,Text> {
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws java.io.IOException ,InterruptedException
{
String line = value.toString();
String[] data = line.split("\t");
FlowBean flow = new FlowBean();
flow.setPhone(Long.parseLong(data[1]));
flow.setUpFlow(Long.parseLong(data[data.length-3]));
flow.setDownFlow(Long.parseLong(data[data.length-2]));
flow.setSumFlow(Long.parseLong(data[data.length-3])+Long.parseLong(data[data.length-2]));
context.write(flow,new Text(""));
}
}
// =======分割线=========
// shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
// 第二部分,写Reduce阶段
public static class MyReduce extends Reducer<FlowBean, Text, FlowBean, Text> {
//同样是有reduce函数
@Override
protected void reduce(FlowBean k2, Iterable<Text> v2s,
Reducer<FlowBean, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
for(Text fb : v2s)
context.write(k2,fb);
}
}
public static void main(String[] args) throws Exception{
//设置配置参数
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
if(fs.exists(new Path("/out")))
fs.delete(new Path("/out"),true);
//创建任务
conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
Path input = new Path("/data/phone_data.txt");
Path output = new Path("/out");
Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
//指定jar文件
job.setJarByClass(MapReduceDemo.class);
//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
FileInputFormat.addInputPath(job,input);
//指定map的类
job.setMapperClass(MyMapper.class);
//指定map输出的key和value的数据类型。
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//指定reduce类以及输出数据类型。
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(Text.class);
//指定输出路径hdfs
FileOutputFormat.setOutputPath(job,output);
//提交任务,如果是true,会返回任务执行的进度信息等。
job.waitForCompletion(true);
}
}
运行结果