1. package com.plat.mapreduce.PartitionSortFlow;
    2. import com.plat.mapreduce.FlowSum.FlowBean;
    3. import com.plat.mapreduce.FlowSum.FlowSumDriver;
    4. import org.apache.commons.lang.StringUtils;
    5. import org.apache.hadoop.conf.Configuration;
    6. import org.apache.hadoop.fs.Path;
    7. import org.apache.hadoop.io.LongWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.mapreduce.Job;
    10. import org.apache.hadoop.mapreduce.Mapper;
    11. import org.apache.hadoop.mapreduce.Partitioner;
    12. import org.apache.hadoop.mapreduce.Reducer;
    13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    17. import java.io.IOException;
    18. import java.util.HashMap;
    19. import java.util.Map;
    20. /**
    21. * 按照手机号进行分区
    22. * 需要实现Pattition
    23. */
    24. public class PartitonSortFlow {
    25. public static void main(String[] args) throws Exception{
    26. Configuration configuration = new Configuration();
    27. Job job = Job.getInstance(configuration);
    28. //告诉程序jar包所在的位置
    29. //自动扫描
    30. job.setJarByClass(FlowSumDriver.class);
    31. //告诉框架我们所需的map类和reduce类
    32. job.setMapperClass(FlowSumMap.class);
    33. job.setReducerClass(FlowSumReduce.class);
    34. //告诉框架我们程序输出的数据类型
    35. job.setMapOutputKeyClass(Text.class);
    36. job.setMapOutputValueClass(FlowBean.class);
    37. job.setOutputKeyClass(Text.class);
    38. job.setOutputValueClass(FlowBean.class);
    39. //设置我们的shuffer组件使用我们自定义的组件
    40. job.setPartitionerClass(PartitionFlowSum.class);
    41. //设置reduceTask个数
    42. //默认是一个partition分区对应一个reduce task,输出文件是一对一
    43. //如果reduce task个数 > partition个数,不会报错,会产生空文件
    44. //如果reduce task个数 < partition个数,就会报错 illegal partition
    45. //如果reduce task个数 = 1,则partition组件无效,不存在分区结果
    46. //默认是1
    47. job.setNumReduceTasks(5);
    48. //告诉程序我们使用的数据读取组件,结果输出组件是什么
    49. job.setInputFormatClass(TextInputFormat.class);
    50. job.setOutputFormatClass(TextOutputFormat.class);
    51. //告诉框架我们要处理的数据文件放在哪个路径
    52. FileInputFormat.setInputPaths(job, new Path("D:\\test\\flowsum\\input"));
    53. FileOutputFormat.setOutputPath(job, new Path("D:\\test\\flowsum\\output"));
    54. boolean flag = job.waitForCompletion(true);
    55. System.exit(flag ? 0 : 1);
    56. }
    57. }
    58. class FlowSumMap extends Mapper<LongWritable, Text, Text, FlowBean> {
    59. @Override
    60. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    61. String line = value.toString();
    62. String[] strArr = StringUtils.split(line,"\t");
    63. context.write(new Text(strArr[1]), new FlowBean(Long.parseLong(strArr[strArr.length - 2]), Long.parseLong(strArr[strArr.length - 2])));
    64. }
    65. }
    66. class FlowSumReduce extends Reducer<Text, FlowBean, Text, FlowBean> {
    67. @Override
    68. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    69. long upFlowSum = 0;
    70. long downFlowSum = 0;
    71. for(FlowBean flowBean : values){
    72. upFlowSum = upFlowSum + flowBean.getUpFlow();
    73. downFlowSum = downFlowSum + flowBean.getDownFlow();
    74. }
    75. FlowBean outFlow = new FlowBean(upFlowSum, downFlowSum);
    76. context.write(key, outFlow);
    77. }
    78. }
    79. class PartitionFlowSum extends Partitioner<Text, FlowBean>{
    80. //定义分区规则
    81. private static Map<String, Integer> map = new HashMap();
    82. static {
    83. map.put("135", 0);
    84. map.put("136", 1);
    85. map.put("137", 2);
    86. map.put("138", 3);
    87. map.put("139", 4);
    88. }
    89. @Override
    90. public int getPartition(Text text, FlowBean flowBean, int i) {
    91. String phone = text.toString().substring(0,3);
    92. Integer code = map.get(phone);
    93. if(code != null) return code;
    94. return 5;
    95. }
    96. }