准备数据
Order_0000001,pd001,222.8Order_0000001,pd005,25.8Order_0000002,pd005,325.8Order_0000002,pd003,522.8Order_0000002,pd004,122.4Order_0000003,pd001,222.8Order_0000003,pd001,322.8

他是记录订单编号,商品和成交金额
然后取出每个订单的top1和topN的数据
里面需要用到一个分组的
- 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
- 在reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是最大值
简介
每个map就负责自己这边的先按照订单id排序,订单id一样按照金额排序
然后reduce从map这边获取,按照key归类,每个key是每组订单,reducer这边直接输出就行了
输出的时候groupingComparator会进行分组输出
top1代码
每个订单有多个商品,找出每笔订单中的最大的一个商品

OrderBean
package com.top;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class OrderBean implements WritableComparable<OrderBean> {private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {//比较他的订单idint cmp = this.itemid.compareTo(o.getItemid());//如果订单id相同就比较金额if (cmp == 0) {//-号表示倒序cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount = new DoubleWritable(readDouble);}@Overridepublic String toString() {return "OrderBean{" +"itemid=" + itemid +", amount=" + amount +'}';}}
ItemIdPartitioner
自定义分区组件
保证一个订单中的相同bean的id一定能分到同一个地方
package com.top;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {@Overridepublic int getPartition(OrderBean key, NullWritable nullWritable, int numPartitions) {//模拟源码中写的,保证一个订单中的相同bean的id一定能分到同一个地方return (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions;}}
ItemidGroupingComparator
package com.top;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class ItemidGroupingComparator extends WritableComparator {protected ItemidGroupingComparator() {//一定要调用下super,里面放你要比较的对象super(OrderBean.class, true);}//他会传入2个你上面的写的对象,比如这边是2个bean@Overridepublic int compare(WritableComparable a, WritableComparable b) {//把这个bean强行转换下OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//取出这2个bean,如果这2个bean的id相比较是一样就放到一起比较//会调用bean里面的比较出结果,负的就舍弃,在reduce的输出阶段//reduce接收的时候都能接收,输出的时候谁要是小了就舍弃了return abean.getItemid().compareTo(bbean.getItemid());}}
TopOne
package com.top;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.StringUtils;import java.io.IOException;public class TopOne {public static class TopOneMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean bean = new OrderBean();// Text itemid = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//如果是空行就直接返回if (line.equals("")) { return ;}String[] fields = StringUtils.split(line, ',');bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));context.write(bean, NullWritable.get());}}public static class TopOneReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TopOne.class);job.setMapperClass(TopOneMapper.class);job.setReducerClass(TopOneReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input"));//如果有这个文件夹就删除Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/");FileSystem fileSystem = FileSystem.get(conf);if (fileSystem.exists(out)) {fileSystem.delete(out, true);}//告诉框架,我们的处理结果要输出到什么地方FileOutputFormat.setOutputPath(job, out);//注册一个GroupingComparatorjob.setGroupingComparatorClass(ItemidGroupingComparator.class);//设置分区job.setPartitionerClass(ItemIdPartitioner.class);//1个reduce任务job.setNumReduceTasks(1);job.waitForCompletion(true);}}
topN代码
bean中要添加
@Overridepublic boolean equals(Object o) {OrderBean bean = (OrderBean) o;return bean.getItemid().equals(this.itemid);}
主类中修改
package com.top;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.StringUtils;import java.io.IOException;public class TopN {static class TopNMapper extends Mapper<LongWritable, Text, OrderBean, OrderBean> {OrderBean v = new OrderBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, ',');k.set(fields[0]);v.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));context.write(v, v);}}static class TopNReducer extends Reducer<OrderBean, OrderBean, NullWritable, OrderBean> {int topn = 1;int count = 0;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();//从驱动配置中取值topn = Integer.parseInt(conf.get("topn"));}@Overrideprotected void reduce(OrderBean key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException {count = 0;for (OrderBean bean : values) {//只给迭代topn次if ((count++) == topn) {return;}context.write(NullWritable.get(), bean);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// ָ如果要写配置文件就这样写// conf.addResource("userconfig.xml");// System.out.println(conf.get("top.n"));//代表要取top几// 我这边就直接设置要求top2了conf.set("topn", "2");Job job = Job.getInstance(conf);job.setJarByClass(TopN.class);job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(OrderBean.class);FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input"));//如果有这个文件夹就删除Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/");FileSystem fileSystem = FileSystem.get(conf);if (fileSystem.exists(out)) {fileSystem.delete(out, true);}//告诉框架,我们的处理结果要输出到什么地方FileOutputFormat.setOutputPath(job, out);//注册一个GroupingComparator,用来比较job.setGroupingComparatorClass(ItemidGroupingComparator.class);//设置分区分组job.setPartitionerClass(ItemIdPartitioner.class);//1个reduce任务job.setNumReduceTasks(1);job.waitForCompletion(true);}}
总结
top几是由reducer那边决定的,他决定要输出几个就是几个,
但是比如多个key到reducer这边,怎么分组是自定义groupingComparator的事情
分组完,按照自定义bean中的排序进行排的
