驱动
package mycode_partitioner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowMRDrive { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //示例化工作 Job job = Job.getInstance(new Configuration()); //配置类 job.setJarByClass(FlowMRDrive.class); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(5); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //数据流的输入输出 FileInputFormat.setInputPaths(job,new Path("C:\\tmp\\input\\phone_data .txt")); FileOutputFormat.setOutputPath(job,new Path("C:\\tmp\\output444")); //提交 job.waitForCompletion(true); }}
序列化
package mycode_partitioner;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.yarn.webapp.hamlet2.HamletSpec;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements Writable { private long upflow; private long downflow; private long sumflow; public FlowBean() { } public FlowBean(long upflow, long downflow) { this.upflow = upflow; this.downflow = downflow; this.sumflow = upflow+downflow; } /** * 获取 * @return upflow */ public long getUpflow() { return upflow; } /** * 设置 * @param upflow */ public void setUpflow(long upflow) { this.upflow = upflow; } /** * 获取 * @return downflow */ public long getDownflow() { return downflow; } /** * 设置 * @param downflow */ public void setDownflow(long downflow) { this.downflow = downflow; } /** * 获取 * @return sumflow */ public long getSumflow() { return sumflow; } /** * 设置 * @param sumflow */ public void setSumflow(long sumflow) { this.sumflow = sumflow; } /* 序列化时要调用的方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(downflow); out.writeLong(sumflow); } /* 反序列化时需要调用的方法 注意:读取数据的顺序要和写的顺序保持一致。 */ public void readFields(DataInput in) throws IOException { upflow = in.readLong(); downflow = in.readLong(); sumflow = in.readLong(); } public String toString() { return "FlowBean{upflow = " + upflow + ", downflow = " + downflow + ", sumflow = " + sumflow + "}"; }}
mapper
package mycode_partitioner;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> { private Text telephone = new Text(); private FlowBean flowBean = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] info = line.split("\t"); flowBean.setUpflow(Long.parseLong(info[info.length-3])); flowBean.setDownflow(Long.parseLong(info[info.length-2])); telephone.set(info[1]); context.write(telephone, flowBean); }}
自定义partitioner
package mycode_partitioner;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class MyPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { String phoneNumber = text.toString(); if (phoneNumber.startsWith("135")) { return 0; } else if (phoneNumber.startsWith("136")) { return 1; } else if (phoneNumber.startsWith("137")) { return 2; } else if (phoneNumber.startsWith("138")) { return 3; } else { return 4; } }}
reducer
package mycode_partitioner;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { private FlowBean flowBean = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpflow=0; long sumDownflow=0; for (FlowBean value : values) { sumUpflow += value.getUpflow(); sumDownflow += value.getDownflow(); } flowBean.setUpflow(sumUpflow); flowBean.setDownflow(sumDownflow); context.write(key,flowBean); }}