驱动

  1. package mycode_partitioner;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class FlowMRDrive {
  10. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  11. //示例化工作
  12. Job job = Job.getInstance(new Configuration());
  13. //配置类
  14. job.setJarByClass(FlowMRDrive.class);
  15. job.setPartitionerClass(MyPartitioner.class);
  16. job.setNumReduceTasks(5);
  17. job.setMapperClass(FlowMapper.class);
  18. job.setReducerClass(FlowReducer.class);
  19. job.setMapOutputKeyClass(Text.class);
  20. job.setMapOutputValueClass(FlowBean.class);
  21. job.setOutputKeyClass(Text.class);
  22. job.setOutputValueClass(FlowBean.class);
  23. //数据流的输入输出
  24. FileInputFormat.setInputPaths(job,new Path("C:\\tmp\\input\\phone_data .txt"));
  25. FileOutputFormat.setOutputPath(job,new Path("C:\\tmp\\output444"));
  26. //提交
  27. job.waitForCompletion(true);
  28. }
  29. }


序列化

  1. package mycode_partitioner;
  2. import org.apache.hadoop.io.Writable;
  3. import org.apache.hadoop.io.WritableComparable;
  4. import org.apache.hadoop.yarn.webapp.hamlet2.HamletSpec;
  5. import java.io.DataInput;
  6. import java.io.DataOutput;
  7. import java.io.IOException;
  8. public class FlowBean implements Writable {
  9. private long upflow;
  10. private long downflow;
  11. private long sumflow;
  12. public FlowBean() {
  13. }
  14. public FlowBean(long upflow, long downflow) {
  15. this.upflow = upflow;
  16. this.downflow = downflow;
  17. this.sumflow = upflow+downflow;
  18. }
  19. /**
  20. * 获取
  21. * @return upflow
  22. */
  23. public long getUpflow() {
  24. return upflow;
  25. }
  26. /**
  27. * 设置
  28. * @param upflow
  29. */
  30. public void setUpflow(long upflow) {
  31. this.upflow = upflow;
  32. }
  33. /**
  34. * 获取
  35. * @return downflow
  36. */
  37. public long getDownflow() {
  38. return downflow;
  39. }
  40. /**
  41. * 设置
  42. * @param downflow
  43. */
  44. public void setDownflow(long downflow) {
  45. this.downflow = downflow;
  46. }
  47. /**
  48. * 获取
  49. * @return sumflow
  50. */
  51. public long getSumflow() {
  52. return sumflow;
  53. }
  54. /**
  55. * 设置
  56. * @param sumflow
  57. */
  58. public void setSumflow(long sumflow) {
  59. this.sumflow = sumflow;
  60. }
  61. /*
  62. 序列化时要调用的方法
  63. */
  64. public void write(DataOutput out) throws IOException {
  65. out.writeLong(upflow);
  66. out.writeLong(downflow);
  67. out.writeLong(sumflow);
  68. }
  69. /*
  70. 反序列化时需要调用的方法
  71. 注意:读取数据的顺序要和写的顺序保持一致。
  72. */
  73. public void readFields(DataInput in) throws IOException {
  74. upflow = in.readLong();
  75. downflow = in.readLong();
  76. sumflow = in.readLong();
  77. }
  78. public String toString() {
  79. return "FlowBean{upflow = " + upflow + ", downflow = " + downflow + ", sumflow = " + sumflow + "}";
  80. }
  81. }

mapper

  1. package mycode_partitioner;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
  7. private Text telephone = new Text();
  8. private FlowBean flowBean = new FlowBean();
  9. @Override
  10. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  11. String line = value.toString();
  12. String[] info = line.split("\t");
  13. flowBean.setUpflow(Long.parseLong(info[info.length-3]));
  14. flowBean.setDownflow(Long.parseLong(info[info.length-2]));
  15. telephone.set(info[1]);
  16. context.write(telephone, flowBean);
  17. }
  18. }

自定义partitioner

  1. package mycode_partitioner;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class MyPartitioner extends Partitioner<Text, FlowBean> {
  5. @Override
  6. public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
  7. String phoneNumber = text.toString();
  8. if (phoneNumber.startsWith("135")) {
  9. return 0;
  10. } else if (phoneNumber.startsWith("136")) {
  11. return 1;
  12. } else if (phoneNumber.startsWith("137")) {
  13. return 2;
  14. } else if (phoneNumber.startsWith("138")) {
  15. return 3;
  16. } else {
  17. return 4;
  18. }
  19. }
  20. }

reducer

  1. package mycode_partitioner;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
  6. private FlowBean flowBean = new FlowBean();
  7. @Override
  8. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  9. long sumUpflow=0;
  10. long sumDownflow=0;
  11. for (FlowBean value : values) {
  12. sumUpflow += value.getUpflow();
  13. sumDownflow += value.getDownflow();
  14. }
  15. flowBean.setUpflow(sumUpflow);
  16. flowBean.setDownflow(sumDownflow);
  17. context.write(key,flowBean);
  18. }
  19. }