一次性完成,统计和排序
我们看下继承的reduce

reduce需要调用run方法,run方法中不仅执行了reduce最后还执行了cleanup
因为map不断的提交给reduce,reduce排序好了就要写,但是这时候一旦写到文件中,后面再来任务,再写的话,就不能和前面一起排序了
所以我们写到一个treeMap中,然后在cleanup中做treeMap做排序
代码主要把继承reduce中的那个类改了下
package com.folwsum;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.StringUtils;import java.io.IOException;import java.util.Map;import java.util.Set;import java.util.TreeMap;public class OneStepFlowSumSort {public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将读取到的每一行数据进行字段的切分String line = value.toString();String[] fields = StringUtils.split(line, ' ');//抽取我们业务所需要的字段String phoneNum = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long downFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNum);v.set(upFlow, downFlow);context.write(k, v);}}public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {//在这里进行reduce端的局部缓存TreeMapTreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>();//这里reduce方法接收到的key就是某一组《a手机号,bean》《a手机号,bean》 《b手机号,bean》《b手机号,bean》当中的第一个手机号//这里reduce方法接收到的values就是这一组kv对中的所以bean的一个迭代器@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upFlowCount = 0;long downFlowCount = 0;for(FlowBean bean : values){upFlowCount += bean.getUpFlow();downFlowCount += bean.getDownFlow();}FlowBean sumbean = new FlowBean();sumbean.set(upFlowCount, downFlowCount);Text text = new Text(key.toString());treeMap.put(sumbean, text);}//这里进行我们全局的最终输出@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet();for(Map.Entry<FlowBean,Text> ent :entrySet){context.write(ent.getValue(), ent.getKey());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OneStepFlowSumSort.class);//告诉程序,我们的程序所用的mapper类和reducer类是什么job.setMapperClass(OneStepFlowSumMapper.class);job.setReducerClass(OneStepFlowSumReducer.class);//告诉框架,我们程序输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);//告诉框架,我们要处理的数据文件在那个路劲下FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));//告诉框架,我们的处理结果要输出到什么地方FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}}
