数据和数据格式
part-r-00000.txt
每一列含义 手机号, 上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和
需求
- 求每一个手机号码 的 上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和的每一列总和(分组求和)
- 按照上行数据包数排序 逆序
-
实现
分组求和
思路:
创建一个类存放上行数据包数, 下行数据包数, 上行流量总和, 下行流量总和 完成序列化
- map reduce 分组求和即可
- 总体难度(简单)
直接上代码了 难度不大
```java package MapReduce_one; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class PhoneMapReduce { public static void deleteFile(Path s) throws IOException { /**
* 删除文件的方法
*/
FileSystem fileSystem = FileSystem.get(new Configuration());
if(fileSystem.exists(s)){
fileSystem.delete(s, true);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
/**
* 求和
*/
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "sortBy");
job.setJarByClass(PhoneMapReduce.class);
job.setMapperClass(PhoneMapReduce.MapFlow.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneMapReduce.FlowBean.class);
job.setReducerClass(PhoneMapReduce.ReduceFlow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneMapReduce.FlowBean.class);
Path path = new Path("data/result2");
deleteFile(path);
FileInputFormat.addInputPath(job,new Path("data/phoneData.txt"));
FileOutputFormat.setOutputPath(job,path);
System.exit(job.waitForCompletion(true)?0:1);
}
public static class FlowBean implements Writable{
// 对象要经过网络传输 所以需要经理序列化
private Integer upFlow; //上行数据包数
private Integer downFlow; // 下行数据包数
private Integer upCountFlow; // 上行流量总和
private Integer downCountFlow; // 下行流量综合
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t"+
downFlow + "\t"+
upCountFlow +"\t" +
downCountFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
// 序列化
dataOutput.writeInt(getDownCountFlow());
dataOutput.writeInt(getDownFlow());
dataOutput.writeInt(getUpCountFlow());
dataOutput.writeInt(getUpFlow());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
// 反序列化
this.setDownCountFlow(dataInput.readInt());
this.setDownFlow(dataInput.readInt());
this.setUpCountFlow(dataInput.readInt());
this.setUpFlow(dataInput.readInt());
}
}
public static class MapFlow extends Mapper<LongWritable, Text,Text, FlowBean >{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
Text text = new Text();
// 数据格式有一点细微的区别
/**
* 这里要注意
*/
String[] split = value.toString().split("\t");
text.set(split[1]);
// 获取需要的值生成flowBean对象
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(Integer.valueOf(split[6]));
flowBean.setDownFlow(Integer.valueOf(split[7]));
flowBean.setUpCountFlow(Integer.valueOf(split[8]));
flowBean.setDownCountFlow(Integer.valueOf(split[9]));
context.write(text, flowBean);
}
}
public static class ReduceFlow extends Reducer<Text, FlowBean,Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
FlowBean flowBean = new FlowBean();
Integer upFlow = 0; //上行数据包数
Integer downFlow = 0; // 下行数据包数
Integer upCountFlow = 0; // 上行流量总和
Integer downCountFlow = 0; // 下行流量综合
// 循环可迭代对象
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
// 给新对象赋值
flowBean.setUpFlow(upFlow);
flowBean.setUpCountFlow(upCountFlow);
flowBean.setDownFlow(downFlow);
flowBean.setDownCountFlow(downCountFlow);
// 传入上下文对象
context.write(key, flowBean);
}
}
}
<a name="FuUcJ"></a>
## 上行数据包数逆序
<a name="gqjkN"></a>
### 思路
1. 类继承的接口不一样WritableComparable
1. 需要编写比较规则
```java
@Override
public int compareTo(FlowBean o) {
// desc
return o.getUpFlow() - this.getUpFlow();
}
直接上代码 难度不大
package MapReduce_one;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PhoneSort {
public static void deleteFile(Path s) throws IOException {
FileSystem fileSystem = FileSystem.get(new Configuration());
if (fileSystem.exists(s)) {
fileSystem.delete(s, true);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
/**
* 排序
*/
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "sortBy");
job.setJarByClass(PhoneSort.class);
job.setMapperClass(PhoneSort.MapFlow.class);
job.setMapOutputKeyClass(PhoneSort.FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(PhoneSort.ReduceFlow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneSort.FlowBean.class);
Path path = new Path("data/result3");
deleteFile(path);
FileInputFormat.addInputPath(job, new Path("data/result2"));
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class FlowBean implements WritableComparable<FlowBean> {
// 对象要经过网络传输 所以需要经理序列化
private Integer upFlow; //上行数据包数
private Integer downFlow; // 下行数据包数
private Integer upCountFlow; // 上行流量总和
private Integer downCountFlow; // 下行流量综合
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t" +
downFlow + "\t" +
"\t" + upCountFlow +
"\t" + downCountFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(getDownCountFlow());
dataOutput.writeInt(getDownFlow());
dataOutput.writeInt(getUpCountFlow());
dataOutput.writeInt(getUpFlow());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.setDownCountFlow(dataInput.readInt());
this.setDownFlow(dataInput.readInt());
this.setUpCountFlow(dataInput.readInt());
this.setUpFlow(dataInput.readInt());
}
@Override
public int compareTo(FlowBean o) {
// desc
return o.getUpFlow() - this.getUpFlow();
}
}
public static class MapFlow extends Mapper<LongWritable, Text, FlowBean, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
Text text = new Text();
String[] split = value.toString().split("\t");
for (String s : split) {
System.out.println(s);
}
text.set(split[0]);
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(Integer.valueOf(split[1]));
flowBean.setDownFlow(Integer.valueOf(split[2]));
flowBean.setUpCountFlow(Integer.valueOf(split[3]));
flowBean.setDownCountFlow(Integer.valueOf(split[4]));
context.write(flowBean, text);
}
}
public static class ReduceFlow extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
}
分区
思路
只需要在需求一脚本的基础上添加分区规则
public static class MapPartition extends Partitioner<Text, PhoneMapReduce.FlowBean> {
@Override
public int getPartition(Text text, PhoneMapReduce.FlowBean flowBean, int numPartitions) {
String b = text.toString();
if (b.startsWith("135")) {
return 0;
} else if (b.startsWith("136")) {
return 1;
} else if (b.startsWith("137")) {
return 2;
} else {
return 3;
}
}
}
因为其他操作基本一致 main方法job指定运行类的时候指定求和里面的map和reduce
区别:
job.setPartitionerClass(PhonePartition.MapPartition.class);
job.setNumReduceTasks(4);
上代码
package MapReduce_one;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PhonePartition {
public static void deleteFile(Path s) throws IOException {
FileSystem fileSystem = FileSystem.get(new Configuration());
if (fileSystem.exists(s)) {
fileSystem.delete(s, true);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
/**
* 求和
*/
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "Partition");
job.setJarByClass(PhoneMapReduce.class);
job.setMapperClass(PhoneMapReduce.MapFlow.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneMapReduce.FlowBean.class);
job.setPartitionerClass(PhonePartition.MapPartition.class);
job.setNumReduceTasks(4);
job.setReducerClass(PhoneMapReduce.ReduceFlow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneMapReduce.FlowBean.class);
Path path = new Path("/user/root/result");
deleteFile(path);
FileInputFormat.addInputPath(job, new Path("/user/root/phoneData.txt"));
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class MapPartition extends Partitioner<Text, PhoneMapReduce.FlowBean> {
@Override
public int getPartition(Text text, PhoneMapReduce.FlowBean flowBean, int numPartitions) {
String b = text.toString();
if (b.startsWith("135")) {
return 0;
} else if (b.startsWith("136")) {
return 1;
} else if (b.startsWith("137")) {
return 2;
} else {
return 3;
}
}
}
}
思考&注意点
- 每一个步骤需要写大量的重复代码是否有一种方式能够简化
- 设置分区 记得同时设置类和reduce个数
- 创建的序列化和反序列那个类 构造方法不能传参
- 文件删除方法不能取消 为了防止文件已经存在的ERROE