驱动
package mycode_partitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowMRDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//示例化工作
Job job = Job.getInstance(new Configuration());
//配置类
job.setJarByClass(FlowMRDrive.class);
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(5);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//数据流的输入输出
FileInputFormat.setInputPaths(job,new Path("C:\\tmp\\input\\phone_data .txt"));
FileOutputFormat.setOutputPath(job,new Path("C:\\tmp\\output444"));
//提交
job.waitForCompletion(true);
}
}
序列化
package mycode_partitioner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.yarn.webapp.hamlet2.HamletSpec;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upflow;
private long downflow;
private long sumflow;
public FlowBean() {
}
public FlowBean(long upflow, long downflow) {
this.upflow = upflow;
this.downflow = downflow;
this.sumflow = upflow+downflow;
}
/**
* 获取
* @return upflow
*/
public long getUpflow() {
return upflow;
}
/**
* 设置
* @param upflow
*/
public void setUpflow(long upflow) {
this.upflow = upflow;
}
/**
* 获取
* @return downflow
*/
public long getDownflow() {
return downflow;
}
/**
* 设置
* @param downflow
*/
public void setDownflow(long downflow) {
this.downflow = downflow;
}
/**
* 获取
* @return sumflow
*/
public long getSumflow() {
return sumflow;
}
/**
* 设置
* @param sumflow
*/
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
/*
序列化时要调用的方法
*/
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(downflow);
out.writeLong(sumflow);
}
/*
反序列化时需要调用的方法
注意:读取数据的顺序要和写的顺序保持一致。
*/
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
downflow = in.readLong();
sumflow = in.readLong();
}
public String toString() {
return "FlowBean{upflow = " + upflow + ", downflow = " + downflow + ", sumflow = " + sumflow + "}";
}
}
mapper
package mycode_partitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
private Text telephone = new Text();
private FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] info = line.split("\t");
flowBean.setUpflow(Long.parseLong(info[info.length-3]));
flowBean.setDownflow(Long.parseLong(info[info.length-2]));
telephone.set(info[1]);
context.write(telephone, flowBean);
}
}
自定义partitioner
package mycode_partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phoneNumber = text.toString();
if (phoneNumber.startsWith("135")) {
return 0;
} else if (phoneNumber.startsWith("136")) {
return 1;
} else if (phoneNumber.startsWith("137")) {
return 2;
} else if (phoneNumber.startsWith("138")) {
return 3;
} else {
return 4;
}
}
}
reducer
package mycode_partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
private FlowBean flowBean = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpflow=0;
long sumDownflow=0;
for (FlowBean value : values) {
sumUpflow += value.getUpflow();
sumDownflow += value.getDownflow();
}
flowBean.setUpflow(sumUpflow);
flowBean.setDownflow(sumDownflow);
context.write(key,flowBean);
}
}