输入数据:
group.txt
实现每一类订单中输出最大值
image.png

4.1 方式一

方式一:按订单id分组,进入同一个reduce中排序输出最大值。此方法也可以输出排序后的前N条记录,但是这种方法在数据量大时候不科学。

  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import java.io.IOException;
  13. import java.net.URI;
  14. import java.util.ArrayList;
  15. import java.util.Arrays;
  16. import java.util.List;
  17. public class MapReduceDemo {
  18. public static class MyMapper extends Mapper<LongWritable, Text, Text,OrderBean> {
  19. protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, OrderBean>.Context context) throws java.io.IOException ,InterruptedException
  20. {
  21. String line = value.toString();
  22. String[] data = line.split("\t");
  23. OrderBean ob = new OrderBean();
  24. ob.setId(Integer.parseInt(data[0]));
  25. ob.setMoney(Double.parseDouble(data[2]));
  26. context.write(new Text(data[0]),ob);
  27. }
  28. }
  29. // =======分割线=========
  30. // shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
  31. // 第二部分,写Reduce阶段
  32. public static class MyReduce extends Reducer<Text, OrderBean, OrderBean, Text> {
  33. //同样是有reduce函数
  34. @Override
  35. protected void reduce(Text k2, Iterable<OrderBean> v2s,
  36. Reducer<Text, OrderBean, OrderBean, Text>.Context context) throws IOException, InterruptedException {
  37. List<OrderBean> list = new ArrayList<>();
  38. for(OrderBean o : v2s){
  39. OrderBean ob = new OrderBean(o.getId(),o.getMoney());
  40. list.add(ob);
  41. }
  42. list.sort((o1,o2)->(int)(o2.getMoney()-o1.getMoney()));
  43. context.write(list.get(0),new Text(""));
  44. }
  45. }
  46. public static void main(String[] args) throws Exception{
  47. //设置配置参数
  48. Configuration conf = new Configuration();
  49. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
  50. if(fs.exists(new Path("/out")))
  51. fs.delete(new Path("/out"),true);
  52. //创建任务
  53. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
  54. Path input = new Path("/data/group.txt");
  55. Path output = new Path("/out");
  56. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
  57. //指定jar文件
  58. job.setJarByClass(MapReduceDemo.class);
  59. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
  60. FileInputFormat.addInputPath(job,input);
  61. //指定map的类
  62. job.setMapperClass(MyMapper.class);
  63. //指定map输出的key和value的数据类型。
  64. job.setMapOutputKeyClass(Text.class);
  65. job.setMapOutputValueClass(OrderBean.class);
  66. //指定reduce类以及输出数据类型。
  67. job.setReducerClass(MyReduce.class);
  68. job.setOutputKeyClass(OrderBean.class);
  69. job.setOutputValueClass(Text.class);
  70. //指定输出路径hdfs
  71. FileOutputFormat.setOutputPath(job,output);
  72. //提交任务,如果是true,会返回任务执行的进度信息等。
  73. job.waitForCompletion(true);
  74. }
  75. }

运行结果:
image.png

方式二

GroupingComparator进行辅助分组排序,该类方法可以自定义进入同一reduce的规则。首先将记录封装称OrderBean,先按id排序,再按money排序,自定义id相同进入同一reduce,按个数直接输出即可

  1. package com.BigData.MapReduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.io.WritableComparator;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. import java.net.URI;
  15. import java.util.ArrayList;
  16. import java.util.List;
  17. public class MapReduceDemo {
  18. public static class MyMapper extends Mapper<LongWritable, Text, OrderBean,Text> {
  19. protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, OrderBean,Text>.Context context) throws java.io.IOException ,InterruptedException
  20. {
  21. String line = value.toString();
  22. String[] data = line.split("\t");
  23. OrderBean ob = new OrderBean();
  24. ob.setId(Integer.parseInt(data[0]));
  25. ob.setMoney(Double.parseDouble(data[2]));
  26. context.write(ob,new Text(""));
  27. }
  28. }
  29. // =======分割线=========
  30. // shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
  31. // 第二部分,写Reduce阶段
  32. public static class MyReduce extends Reducer<OrderBean, Text, OrderBean, Text> {
  33. //同样是有reduce函数
  34. @Override
  35. protected void reduce(OrderBean k2, Iterable<Text> v2s,
  36. Reducer<OrderBean, Text, OrderBean, Text>.Context context) throws IOException, InterruptedException {
  37. context.write(k2,new Text(""));
  38. }
  39. }
  40. public static void main(String[] args) throws Exception{
  41. //设置配置参数
  42. Configuration conf = new Configuration();
  43. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
  44. if(fs.exists(new Path("/out")))
  45. fs.delete(new Path("/out"),true);
  46. //创建任务
  47. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
  48. Path input = new Path("/data/group.txt");
  49. Path output = new Path("/out");
  50. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
  51. //指定jar文件
  52. job.setJarByClass(MapReduceDemo.class);
  53. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
  54. FileInputFormat.addInputPath(job,input);
  55. //指定map的类
  56. job.setMapperClass(MyMapper.class);
  57. //指定map输出的key和value的数据类型。
  58. job.setMapOutputKeyClass(OrderBean.class);
  59. job.setMapOutputValueClass(Text.class);
  60. //指定reduce类以及输出数据类型。
  61. job.setReducerClass(MyReduce.class);
  62. job.setOutputKeyClass(OrderBean.class);
  63. job.setOutputValueClass(Text.class);
  64. job.setGroupingComparatorClass(MyGroupComparator.class);
  65. //指定输出路径hdfs
  66. FileOutputFormat.setOutputPath(job,output);
  67. //提交任务,如果是true,会返回任务执行的进度信息等。
  68. job.waitForCompletion(true);
  69. }
  70. }
  71. class MyGroupComparator extends WritableComparator{
  72. protected MyGroupComparator() {
  73. super(OrderBean.class, true);
  74. }
  75. @Override
  76. public int compare(WritableComparable a, WritableComparable b) {
  77. OrderBean ob1 = (OrderBean) a;
  78. OrderBean ob2 = (OrderBean) b;
  79. return ob1.getId()-ob2.getId();
  80. }
  81. }

注意:
(1)需要设置 job.setGroupingComparatorClass(MyGroupComparator.class);
(2)重写compare方法中的参数要选择WritableComparable类型
(3)如果需要分组排序后输出前N项,在reduce中加一个循环即可。比如,输出每组排名前两个

  1. for(int i = 0; i <=1; i++)
  2. context.write(k2,new Text(""));

运行结果:
image.png