数据和数据格式

part-r-00000.txt
每一列含义 手机号, 上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和

需求

  1. 求每一个手机号码 的 上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和的每一列总和(分组求和)
  2. 按照上行数据包数排序 逆序
  3. 按照手机号码分区(12

    实现

    分组求和

    思路:

  4. 创建一个类存放上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和 完成序列化

  5. map reduce 分组求和即可
  6. 总体难度(简单)

    直接上代码了 难度不大

    ```java package MapReduce_one; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class PhoneMapReduce { public static void deleteFile(Path s) throws IOException { /**

  1. * 删除文件的方法
  2. */
  3. FileSystem fileSystem = FileSystem.get(new Configuration());
  4. if(fileSystem.exists(s)){
  5. fileSystem.delete(s, true);
  6. }
  7. }
  8. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  9. /**
  10. * 求和
  11. */
  12. Configuration configuration = new Configuration();
  13. Job job = Job.getInstance(configuration, "sortBy");
  14. job.setJarByClass(PhoneMapReduce.class);
  15. job.setMapperClass(PhoneMapReduce.MapFlow.class);
  16. job.setMapOutputKeyClass(Text.class);
  17. job.setMapOutputValueClass(PhoneMapReduce.FlowBean.class);
  18. job.setReducerClass(PhoneMapReduce.ReduceFlow.class);
  19. job.setOutputKeyClass(Text.class);
  20. job.setOutputValueClass(PhoneMapReduce.FlowBean.class);
  21. Path path = new Path("data/result2");
  22. deleteFile(path);
  23. FileInputFormat.addInputPath(job,new Path("data/phoneData.txt"));
  24. FileOutputFormat.setOutputPath(job,path);
  25. System.exit(job.waitForCompletion(true)?0:1);
  26. }
  27. public static class FlowBean implements Writable{
  28. // 对象要经过网络传输 所以需要经理序列化
  29. private Integer upFlow; //上行数据包数
  30. private Integer downFlow; // 下行数据包数
  31. private Integer upCountFlow; // 上行流量总和
  32. private Integer downCountFlow; // 下行流量综合
  33. public Integer getUpFlow() {
  34. return upFlow;
  35. }
  36. public void setUpFlow(Integer upFlow) {
  37. this.upFlow = upFlow;
  38. }
  39. public Integer getDownFlow() {
  40. return downFlow;
  41. }
  42. public void setDownFlow(Integer downFlow) {
  43. this.downFlow = downFlow;
  44. }
  45. public Integer getUpCountFlow() {
  46. return upCountFlow;
  47. }
  48. public void setUpCountFlow(Integer upCountFlow) {
  49. this.upCountFlow = upCountFlow;
  50. }
  51. public Integer getDownCountFlow() {
  52. return downCountFlow;
  53. }
  54. public void setDownCountFlow(Integer downCountFlow) {
  55. this.downCountFlow = downCountFlow;
  56. }
  57. @Override
  58. public String toString() {
  59. return upFlow + "\t"+
  60. downFlow + "\t"+
  61. upCountFlow +"\t" +
  62. downCountFlow;
  63. }
  64. @Override
  65. public void write(DataOutput dataOutput) throws IOException {
  66. // 序列化
  67. dataOutput.writeInt(getDownCountFlow());
  68. dataOutput.writeInt(getDownFlow());
  69. dataOutput.writeInt(getUpCountFlow());
  70. dataOutput.writeInt(getUpFlow());
  71. }
  72. @Override
  73. public void readFields(DataInput dataInput) throws IOException {
  74. // 反序列化
  75. this.setDownCountFlow(dataInput.readInt());
  76. this.setDownFlow(dataInput.readInt());
  77. this.setUpCountFlow(dataInput.readInt());
  78. this.setUpFlow(dataInput.readInt());
  79. }
  80. }
  81. public static class MapFlow extends Mapper<LongWritable, Text,Text, FlowBean >{
  82. @Override
  83. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  84. Text text = new Text();
  85. // 数据格式有一点细微的区别
  86. /**
  87. * 这里要注意
  88. */
  89. String[] split = value.toString().split("\t");
  90. text.set(split[1]);
  91. // 获取需要的值生成flowBean对象
  92. FlowBean flowBean = new FlowBean();
  93. flowBean.setUpFlow(Integer.valueOf(split[6]));
  94. flowBean.setDownFlow(Integer.valueOf(split[7]));
  95. flowBean.setUpCountFlow(Integer.valueOf(split[8]));
  96. flowBean.setDownCountFlow(Integer.valueOf(split[9]));
  97. context.write(text, flowBean);
  98. }
  99. }
  100. public static class ReduceFlow extends Reducer<Text, FlowBean,Text, FlowBean>{
  101. @Override
  102. protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  103. FlowBean flowBean = new FlowBean();
  104. Integer upFlow = 0; //上行数据包数
  105. Integer downFlow = 0; // 下行数据包数
  106. Integer upCountFlow = 0; // 上行流量总和
  107. Integer downCountFlow = 0; // 下行流量综合
  108. // 循环可迭代对象
  109. for (FlowBean value : values) {
  110. upFlow += value.getUpFlow();
  111. downFlow += value.getDownFlow();
  112. upCountFlow += value.getUpCountFlow();
  113. downCountFlow += value.getDownCountFlow();
  114. }
  115. // 给新对象赋值
  116. flowBean.setUpFlow(upFlow);
  117. flowBean.setUpCountFlow(upCountFlow);
  118. flowBean.setDownFlow(downFlow);
  119. flowBean.setDownCountFlow(downCountFlow);
  120. // 传入上下文对象
  121. context.write(key, flowBean);
  122. }
  123. }

}

<a name="FuUcJ"></a>
## 上行数据包数逆序
<a name="gqjkN"></a>
### 思路

1. 类继承的接口不一样WritableComparable
1. 需要编写比较规则
```java
@Override
        public int compareTo(FlowBean o) {
            // desc
            return o.getUpFlow() - this.getUpFlow();
        }

直接上代码 难度不大

package MapReduce_one;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PhoneSort {
    public static void deleteFile(Path s) throws IOException {
        FileSystem fileSystem = FileSystem.get(new Configuration());
        if (fileSystem.exists(s)) {
            fileSystem.delete(s, true);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        /**
         * 排序
         */
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "sortBy");
        job.setJarByClass(PhoneSort.class);
        job.setMapperClass(PhoneSort.MapFlow.class);
        job.setMapOutputKeyClass(PhoneSort.FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(PhoneSort.ReduceFlow.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PhoneSort.FlowBean.class);
        Path path = new Path("data/result3");
        deleteFile(path);
        FileInputFormat.addInputPath(job, new Path("data/result2"));
        FileOutputFormat.setOutputPath(job, path);

        System.exit(job.waitForCompletion(true) ? 0 : 1);


    }

    public static class FlowBean implements WritableComparable<FlowBean> {
        // 对象要经过网络传输 所以需要经理序列化
        private Integer upFlow; //上行数据包数
        private Integer downFlow; // 下行数据包数
        private Integer upCountFlow; // 上行流量总和
        private Integer downCountFlow; // 下行流量综合

        public Integer getUpFlow() {
            return upFlow;
        }

        public void setUpFlow(Integer upFlow) {
            this.upFlow = upFlow;
        }

        public Integer getDownFlow() {
            return downFlow;
        }

        public void setDownFlow(Integer downFlow) {
            this.downFlow = downFlow;
        }

        public Integer getUpCountFlow() {
            return upCountFlow;
        }

        public void setUpCountFlow(Integer upCountFlow) {
            this.upCountFlow = upCountFlow;
        }

        public Integer getDownCountFlow() {
            return downCountFlow;
        }

        public void setDownCountFlow(Integer downCountFlow) {
            this.downCountFlow = downCountFlow;
        }

        @Override
        public String toString() {
            return upFlow + "\t" +
                    downFlow + "\t" +
                    "\t" + upCountFlow +
                    "\t" + downCountFlow;
        }

        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(getDownCountFlow());
            dataOutput.writeInt(getDownFlow());
            dataOutput.writeInt(getUpCountFlow());
            dataOutput.writeInt(getUpFlow());
        }

        @Override
        public void readFields(DataInput dataInput) throws IOException {
            this.setDownCountFlow(dataInput.readInt());
            this.setDownFlow(dataInput.readInt());
            this.setUpCountFlow(dataInput.readInt());
            this.setUpFlow(dataInput.readInt());
        }

        @Override
        public int compareTo(FlowBean o) {
            // desc
            return o.getUpFlow() - this.getUpFlow();
        }
    }

    public static class MapFlow extends Mapper<LongWritable, Text, FlowBean, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
            Text text = new Text();

            String[] split = value.toString().split("\t");
            for (String s : split) {
                System.out.println(s);
            }
            text.set(split[0]);
            FlowBean flowBean = new FlowBean();
            flowBean.setUpFlow(Integer.valueOf(split[1]));
            flowBean.setDownFlow(Integer.valueOf(split[2]));
            flowBean.setUpCountFlow(Integer.valueOf(split[3]));
            flowBean.setDownCountFlow(Integer.valueOf(split[4]));
            context.write(flowBean, text);
        }
    }

    public static class ReduceFlow extends Reducer<FlowBean, Text, Text, FlowBean> {
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                context.write(value, key);
            }
        }
    }

}

分区

思路

只需要在需求一脚本的基础上添加分区规则

public static class MapPartition extends Partitioner<Text, PhoneMapReduce.FlowBean> {
        @Override
        public int getPartition(Text text, PhoneMapReduce.FlowBean flowBean, int numPartitions) {
            String b = text.toString();
            if (b.startsWith("135")) {
                return 0;
            } else if (b.startsWith("136")) {
                return 1;
            } else if (b.startsWith("137")) {
                return 2;
            } else {
                return 3;
            }
        }
    }

因为其他操作基本一致 main方法job指定运行类的时候指定求和里面的map和reduce
区别:

job.setPartitionerClass(PhonePartition.MapPartition.class);
job.setNumReduceTasks(4);

上代码

package MapReduce_one;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PhonePartition {
    public static void deleteFile(Path s) throws IOException {
        FileSystem fileSystem = FileSystem.get(new Configuration());
        if (fileSystem.exists(s)) {
            fileSystem.delete(s, true);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        /**
         * 求和
         */
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "Partition");
        job.setJarByClass(PhoneMapReduce.class);
        job.setMapperClass(PhoneMapReduce.MapFlow.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(PhoneMapReduce.FlowBean.class);
        job.setPartitionerClass(PhonePartition.MapPartition.class);
        job.setNumReduceTasks(4);
        job.setReducerClass(PhoneMapReduce.ReduceFlow.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(PhoneMapReduce.FlowBean.class);
        Path path = new Path("/user/root/result");
        deleteFile(path);
        FileInputFormat.addInputPath(job, new Path("/user/root/phoneData.txt"));
        FileOutputFormat.setOutputPath(job, path);

        System.exit(job.waitForCompletion(true) ? 0 : 1);


    }

    public static class MapPartition extends Partitioner<Text, PhoneMapReduce.FlowBean> {
        @Override
        public int getPartition(Text text, PhoneMapReduce.FlowBean flowBean, int numPartitions) {
            String b = text.toString();
            if (b.startsWith("135")) {
                return 0;
            } else if (b.startsWith("136")) {
                return 1;
            } else if (b.startsWith("137")) {
                return 2;
            } else {
                return 3;
            }
        }
    }

}

思考&注意点

  1. 每一个步骤需要写大量的重复代码是否有一种方式能够简化
  2. 设置分区 记得同时设置类和reduce个数
  3. 创建的序列化和反序列那个类 构造方法不能传参
  4. 文件删除方法不能取消 为了防止文件已经存在的ERROE