package com.plat.mapreduce.PartitionSortFlow;import com.plat.mapreduce.FlowSum.FlowBean;import com.plat.mapreduce.FlowSum.FlowSumDriver;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;import java.util.HashMap;import java.util.Map;/** * 按照手机号进行分区 * 需要实现Pattition */public class PartitonSortFlow { public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //告诉程序jar包所在的位置 //自动扫描 job.setJarByClass(FlowSumDriver.class); //告诉框架我们所需的map类和reduce类 job.setMapperClass(FlowSumMap.class); job.setReducerClass(FlowSumReduce.class); //告诉框架我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置我们的shuffer组件使用我们自定义的组件 job.setPartitionerClass(PartitionFlowSum.class); //设置reduceTask个数 //默认是一个partition分区对应一个reduce task,输出文件是一对一 //如果reduce task个数 > partition个数,不会报错,会产生空文件 //如果reduce task个数 < partition个数,就会报错 illegal partition //如果reduce task个数 = 1,则partition组件无效,不存在分区结果 //默认是1 job.setNumReduceTasks(5); //告诉程序我们使用的数据读取组件,结果输出组件是什么 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告诉框架我们要处理的数据文件放在哪个路径 FileInputFormat.setInputPaths(job, new Path("D:\\test\\flowsum\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\test\\flowsum\\output")); boolean flag = job.waitForCompletion(true); System.exit(flag ? 0 : 1); }}class FlowSumMap extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strArr = StringUtils.split(line,"\t"); context.write(new Text(strArr[1]), new FlowBean(Long.parseLong(strArr[strArr.length - 2]), Long.parseLong(strArr[strArr.length - 2]))); }}class FlowSumReduce extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowSum = 0; long downFlowSum = 0; for(FlowBean flowBean : values){ upFlowSum = upFlowSum + flowBean.getUpFlow(); downFlowSum = downFlowSum + flowBean.getDownFlow(); } FlowBean outFlow = new FlowBean(upFlowSum, downFlowSum); context.write(key, outFlow); }}class PartitionFlowSum extends Partitioner<Text, FlowBean>{ //定义分区规则 private static Map<String, Integer> map = new HashMap(); static { map.put("135", 0); map.put("136", 1); map.put("137", 2); map.put("138", 3); map.put("139", 4); } @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone = text.toString().substring(0,3); Integer code = map.get(phone); if(code != null) return code; return 5; }}