对不同的手机号码分成不同文件

接着上面的例子

默认的分区规则

image.png

分区类

  1. package com.folwsum;
  2. import org.apache.hadoop.mapreduce.Partitioner;
  3. import java.util.HashMap;
  4. public class ProvivcePartitioner extends Partitioner {
  5. private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
  6. static{
  7. //我们这边就提前设置手机号码对应的分区
  8. provinceMap.put("135", 0);
  9. provinceMap.put("136", 1);
  10. provinceMap.put("137", 2);
  11. provinceMap.put("138", 3);
  12. provinceMap.put("139", 4);
  13. }
  14. @Override
  15. public int getPartition(Object o, Object o2, int numPartitions) {
  16. //根据手机的前3位,进行取他的值,就是上面定义的
  17. Integer code = provinceMap.get(o.toString().substring(0, 3));
  18. if(code != null){
  19. return code;
  20. }
  21. //没有取到就分到5去
  22. return 5;
  23. }
  24. }

任务类

主要是main方法里面的

  1. package com.folwsum;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  14. import org.apache.hadoop.util.StringUtils;
  15. import java.io.IOException;
  16. public class FlowSumProvince {
  17. public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  18. Text k = new Text();
  19. FlowBean v = new FlowBean();
  20. @Override
  21. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  22. //将读取到的每一行数据进行字段的切分
  23. String line = value.toString();
  24. String[] fields = StringUtils.split(line, ' ');
  25. //抽取我们业务所需要的字段
  26. String phoneNum = fields[1];
  27. long upFlow = Long.parseLong(fields[fields.length - 3]);
  28. long downFlow = Long.parseLong(fields[fields.length - 2]);
  29. k.set(phoneNum);
  30. v.set(upFlow, downFlow);
  31. context.write(k, v);
  32. }
  33. }
  34. public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
  35. @Override
  36. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  37. long upFlowCount = 0;
  38. long downFlowCount = 0;
  39. for(FlowBean bean : values){
  40. upFlowCount += bean.getUpFlow();
  41. downFlowCount += bean.getDownFlow();
  42. }
  43. FlowBean sumbean = new FlowBean();
  44. sumbean.set(upFlowCount, downFlowCount);
  45. context.write(key, sumbean);
  46. }
  47. }
  48. public static void main(String[] args) throws Exception {
  49. Configuration conf = new Configuration();
  50. Job job = Job.getInstance(conf);
  51. job.setJarByClass(FlowSumProvince.class);
  52. //告诉程序,我们的程序所用的mapper类和reducer类是什么
  53. job.setMapperClass(ProvinceFlowSumMapper.class);
  54. job.setReducerClass(ProvinceFlowSumReducer.class);
  55. //告诉框架,我们程序输出的数据类型
  56. job.setMapOutputKeyClass(Text.class);
  57. job.setMapOutputValueClass(FlowBean.class);
  58. job.setOutputKeyClass(Text.class);
  59. job.setOutputValueClass(FlowBean.class);
  60. //设置我们的shuffer的分区组件使用我们自定义的组件
  61. job.setPartitionerClass(ProvivcePartitioner.class);
  62. //这里设置我们的reduce task个数 默认是一个partition分区对应一个reduce task 输出文件也是一对一
  63. //如果我们的Reduce task个数 < partition分区数 就会报错Illegal partition
  64. //如果我们的Reduce task个数 > partition分区数 不会报错,会有空文件产生
  65. //如果我们的Reduce task个数 = 1 partitoner组件就无效了 不存在分区的结果
  66. //这边设置为6,因为没有匹配到的就到第5个
  67. job.setNumReduceTasks(6);
  68. //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
  69. //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
  70. job.setInputFormatClass(TextInputFormat.class);
  71. job.setOutputFormatClass(TextOutputFormat.class);
  72. //告诉框架,我们要处理的数据文件在那个路劲下
  73. FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input"));
  74. //如果有这个文件夹就删除
  75. Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output");
  76. FileSystem fileSystem = FileSystem.get(conf);
  77. if (fileSystem.exists(out)) {
  78. fileSystem.delete(out, true);
  79. }
  80. //告诉框架,我们的处理结果要输出到什么地方
  81. FileOutputFormat.setOutputPath(job, out);
  82. boolean res = job.waitForCompletion(true);
  83. System.exit(res?0:1);
  84. }
  85. }