输入数据:
group.txt
实现每一类订单中输出最大值
4.1 方式一
方式一:按订单id分组,进入同一个reduce中排序输出最大值。此方法也可以输出排序后的前N条记录,但是这种方法在数据量大时候不科学。
package com.BigData.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class MapReduceDemo {
public static class MyMapper extends Mapper<LongWritable, Text, Text,OrderBean> {
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, OrderBean>.Context context) throws java.io.IOException ,InterruptedException
{
String line = value.toString();
String[] data = line.split("\t");
OrderBean ob = new OrderBean();
ob.setId(Integer.parseInt(data[0]));
ob.setMoney(Double.parseDouble(data[2]));
context.write(new Text(data[0]),ob);
}
}
// =======分割线=========
// shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
// 第二部分,写Reduce阶段
public static class MyReduce extends Reducer<Text, OrderBean, OrderBean, Text> {
//同样是有reduce函数
@Override
protected void reduce(Text k2, Iterable<OrderBean> v2s,
Reducer<Text, OrderBean, OrderBean, Text>.Context context) throws IOException, InterruptedException {
List<OrderBean> list = new ArrayList<>();
for(OrderBean o : v2s){
OrderBean ob = new OrderBean(o.getId(),o.getMoney());
list.add(ob);
}
list.sort((o1,o2)->(int)(o2.getMoney()-o1.getMoney()));
context.write(list.get(0),new Text(""));
}
}
public static void main(String[] args) throws Exception{
//设置配置参数
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
if(fs.exists(new Path("/out")))
fs.delete(new Path("/out"),true);
//创建任务
conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
Path input = new Path("/data/group.txt");
Path output = new Path("/out");
Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
//指定jar文件
job.setJarByClass(MapReduceDemo.class);
//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
FileInputFormat.addInputPath(job,input);
//指定map的类
job.setMapperClass(MyMapper.class);
//指定map输出的key和value的数据类型。
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderBean.class);
//指定reduce类以及输出数据类型。
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(Text.class);
//指定输出路径hdfs
FileOutputFormat.setOutputPath(job,output);
//提交任务,如果是true,会返回任务执行的进度信息等。
job.waitForCompletion(true);
}
}
方式二
GroupingComparator进行辅助分组排序,该类方法可以自定义进入同一reduce的规则。首先将记录封装称OrderBean,先按id排序,再按money排序,自定义id相同进入同一reduce,按个数直接输出即可
package com.BigData.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
public class MapReduceDemo {
public static class MyMapper extends Mapper<LongWritable, Text, OrderBean,Text> {
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, OrderBean,Text>.Context context) throws java.io.IOException ,InterruptedException
{
String line = value.toString();
String[] data = line.split("\t");
OrderBean ob = new OrderBean();
ob.setId(Integer.parseInt(data[0]));
ob.setMoney(Double.parseDouble(data[2]));
context.write(ob,new Text(""));
}
}
// =======分割线=========
// shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。
// 第二部分,写Reduce阶段
public static class MyReduce extends Reducer<OrderBean, Text, OrderBean, Text> {
//同样是有reduce函数
@Override
protected void reduce(OrderBean k2, Iterable<Text> v2s,
Reducer<OrderBean, Text, OrderBean, Text>.Context context) throws IOException, InterruptedException {
context.write(k2,new Text(""));
}
}
public static void main(String[] args) throws Exception{
//设置配置参数
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
if(fs.exists(new Path("/out")))
fs.delete(new Path("/out"),true);
//创建任务
conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
Path input = new Path("/data/group.txt");
Path output = new Path("/out");
Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
//指定jar文件
job.setJarByClass(MapReduceDemo.class);
//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
FileInputFormat.addInputPath(job,input);
//指定map的类
job.setMapperClass(MyMapper.class);
//指定map输出的key和value的数据类型。
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
//指定reduce类以及输出数据类型。
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(Text.class);
job.setGroupingComparatorClass(MyGroupComparator.class);
//指定输出路径hdfs
FileOutputFormat.setOutputPath(job,output);
//提交任务,如果是true,会返回任务执行的进度信息等。
job.waitForCompletion(true);
}
}
class MyGroupComparator extends WritableComparator{
protected MyGroupComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean ob1 = (OrderBean) a;
OrderBean ob2 = (OrderBean) b;
return ob1.getId()-ob2.getId();
}
}
注意:
(1)需要设置 job.setGroupingComparatorClass(MyGroupComparator.class);
(2)重写compare方法中的参数要选择WritableComparable类型
(3)如果需要分组排序后输出前N项,在reduce中加一个循环即可。比如,输出每组排名前两个
for(int i = 0; i <=1; i++)
context.write(k2,new Text(""));
运行结果: