MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。默认按照字典顺序排序,且使用快速排序。
比如:
image.png

3.1 JAVA中的自定义排序

案例:对一个基本数据类型,调用Arrays.sort方法进行排序

  1. package com.BigData.test;
  2. import java.util.Arrays;
  3. public class Test {
  4. public static void main(String[]args) {
  5. int[] data = {4,5,1,2,7,8};
  6. for(int x:data)
  7. System.out.println(x);
  8. System.out.println("#######################");
  9. Arrays.sort(data);
  10. for(int x:data)
  11. System.out.println(x);
  12. }
  13. }

案例:自定义排序
如果自定义类不支持排序,则会抛出异常 :::info Exception in thread “main” java.lang.ClassCastException: com.BigData.test.Student cannot be cast to java.lang.Comparable ::: 让自定义类实现Comparable接口,并重写compareTo方法明确排序规则

  1. package com.BigData.test;
  2. import java.util.Arrays;
  3. class Student implements Comparable {
  4. private String name;
  5. private int age;
  6. public Student() {
  7. }
  8. public Student(String name, int age) {
  9. this.name = name;
  10. this.age = age;
  11. }
  12. public String getName() {
  13. return name;
  14. }
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. public int getAge() {
  19. return age;
  20. }
  21. public void setAge(int age) {
  22. this.age = age;
  23. }
  24. @Override
  25. public String toString() {
  26. return "Student{" +
  27. "name='" + name + '\'' +
  28. ", age=" + age +
  29. '}';
  30. }
  31. @Override
  32. public int compareTo(Object o) {
  33. Student stu = (Student) o;
  34. return stu.age-this.age;
  35. }
  36. }
  37. public class Test {
  38. public static void main(String[]args) {
  39. Student [] stu = new Student[]{
  40. new Student("张三",10),
  41. new Student("李四",11),
  42. new Student("王五",9),
  43. new Student("赵六",15)
  44. };
  45. for(Student s : stu)
  46. System.out.println(s);
  47. System.out.println("############排序分割线");
  48. Arrays.sort(stu);
  49. for(Student s : stu)
  50. System.out.println(s);
  51. }
  52. }

运行结果:
image.png
另比如需要按照姓名的字典序排序,则改写compareTo中的比较规则即可

  1. public int compareTo(Object o) {
  2. Student stu = (Student) o;
  3. return stu.name.compareTo(this.name);
  4. }

image.png

3.2 Hadoop中的自定义排序

bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
案例:
要求每个省份手机号输出的文件中按照总流量排序。

  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.io.Writable;
  3. import org.apache.hadoop.io.WritableComparable;
  4. import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;
  5. import java.io.DataInput;
  6. import java.io.DataOutput;
  7. import java.io.IOException;
  8. public class FlowBean implements WritableComparable {
  9. private long upFlow;
  10. private long downFlow;
  11. private long sumFlow;
  12. private long phone;
  13. public FlowBean(){}
  14. public FlowBean(long upFlow, long downFlow, long sumFlow, long phone) {
  15. this.upFlow = upFlow;
  16. this.downFlow = downFlow;
  17. this.sumFlow = sumFlow;
  18. this.phone = phone;
  19. }
  20. public long getUpFlow() {
  21. return upFlow;
  22. }
  23. public void setUpFlow(long upFlow) {
  24. this.upFlow = upFlow;
  25. }
  26. public long getDownFlow() {
  27. return downFlow;
  28. }
  29. public void setDownFlow(long downFlow) {
  30. this.downFlow = downFlow;
  31. }
  32. public long getSumFlow() {
  33. return sumFlow;
  34. }
  35. public void setSumFlow(long sumFlow) {
  36. this.sumFlow = sumFlow;
  37. }
  38. public long getPhone() {
  39. return phone;
  40. }
  41. public void setPhone(long phone) {
  42. this.phone = phone;
  43. }
  44. @Override
  45. public String toString() {
  46. return this.phone+"\t"+this.upFlow+"\t"+this.downFlow+"\t"+this.sumFlow;
  47. }
  48. @Override
  49. public int compareTo(Object o) {
  50. FlowBean fb = (FlowBean) o;
  51. return (int)(fb.sumFlow - this.sumFlow);
  52. }
  53. @Override
  54. public void write(DataOutput out) throws IOException {
  55. out.writeLong(upFlow);
  56. out.writeLong(downFlow);
  57. out.writeLong(sumFlow);
  58. out.writeLong(phone);
  59. }
  60. @Override
  61. public void readFields(DataInput in) throws IOException {
  62. this.upFlow = in.readLong();
  63. this.downFlow = in.readLong();
  64. this.sumFlow = in.readLong();
  65. this.phone=in.readLong();
  66. }
  67. }

MapReduce主程序

  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import java.io.IOException;
  13. import java.net.URI;
  14. public class MapReduceDemo {
  15. public static class MyMapper extends Mapper<LongWritable, Text, FlowBean,Text> {
  16. protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws java.io.IOException ,InterruptedException
  17. {
  18. String line = value.toString();
  19. String[] data = line.split("\t");
  20. FlowBean flow = new FlowBean();
  21. flow.setPhone(Long.parseLong(data[1]));
  22. flow.setUpFlow(Long.parseLong(data[data.length-3]));
  23. flow.setDownFlow(Long.parseLong(data[data.length-2]));
  24. flow.setSumFlow(Long.parseLong(data[data.length-3])+Long.parseLong(data[data.length-2]));
  25. context.write(flow,new Text(""));
  26. }
  27. }
  28. // =======分割线=========
  29. // shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
  30. // 第二部分,写Reduce阶段
  31. public static class MyReduce extends Reducer<FlowBean, Text, FlowBean, Text> {
  32. //同样是有reduce函数
  33. @Override
  34. protected void reduce(FlowBean k2, Iterable<Text> v2s,
  35. Reducer<FlowBean, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
  36. for(Text fb : v2s)
  37. context.write(k2,fb);
  38. }
  39. }
  40. public static void main(String[] args) throws Exception{
  41. //设置配置参数
  42. Configuration conf = new Configuration();
  43. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
  44. if(fs.exists(new Path("/out")))
  45. fs.delete(new Path("/out"),true);
  46. //创建任务
  47. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
  48. Path input = new Path("/data/phone_data.txt");
  49. Path output = new Path("/out");
  50. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
  51. //指定jar文件
  52. job.setJarByClass(MapReduceDemo.class);
  53. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
  54. FileInputFormat.addInputPath(job,input);
  55. //指定map的类
  56. job.setMapperClass(MyMapper.class);
  57. //指定map输出的key和value的数据类型。
  58. job.setMapOutputKeyClass(FlowBean.class);
  59. job.setMapOutputValueClass(Text.class);
  60. //指定reduce类以及输出数据类型。
  61. job.setReducerClass(MyReduce.class);
  62. job.setOutputKeyClass(FlowBean.class);
  63. job.setOutputValueClass(Text.class);
  64. //指定输出路径hdfs
  65. FileOutputFormat.setOutputPath(job,output);
  66. //提交任务,如果是true,会返回任务执行的进度信息等。
  67. job.waitForCompletion(true);
  68. }
  69. }

运行结果
image.png