(1)要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件(分区)
(2)默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户无法控制哪个key存储到哪个分区
自定义分区
1、自定义类继承Partitioner,重写getPartition()方法
2、在Job驱动中,设置自定义Partitioner:
job.setPartitionerClass(CustomerPartitioner.class)
3、根据自定义分区的逻辑设置相应数量的ReduceTask
job.setNumReduceTask(5)
分区总结
- 如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000XX
- 如果小于,会异常
- ReduceTask等于1,只会有一个结果文件
案例
将统计结果按照手机归属地不同省份输出到不同文件中
(1)输入数据
与1相同
(2)期望输出
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
(3)代码实现
```java package com.BigData.MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;package com.BigData.MapReduce;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class CustomerPartition extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text key, FlowBean flow, int numPartitions) {String preNumber = key.toString().substring(0,3);if("136".equals(preNumber))return 0;else if("137".equals(preNumber))return 1;else if("138".equals(preNumber))return 2;else if("139".equals(preNumber))return 3;return 4;}}
import java.io.IOException; import java.net.URI;
public class MapReduceDemo {
public static class MyMapper extends Mapper
public static void main(String[] args) throws Exception{//设置配置参数Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);if(fs.exists(new Path("/out")))fs.delete(new Path("/out"),true);//创建任务conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");Path input = new Path("/data/phone_data.txt");Path output = new Path("/out");Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());//指定jar文件job.setJarByClass(MapReduceDemo.class);//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径FileInputFormat.addInputPath(job,input);//指定map的类job.setMapperClass(MyMapper.class);//指定map输出的key和value的数据类型。job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setNumReduceTasks(5);job.setPartitionerClass(CustomerPartition.class);//指定reduce类以及输出数据类型。job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定输出路径hdfsFileOutputFormat.setOutputPath(job,output);//提交任务,如果是true,会返回任务执行的进度信息等。job.waitForCompletion(true);}
}
```
注意在main函数中配置job属性
62 job.setNumReduceTasks(5);
63 job.setPartitionerClass(CustomerPartition.class);
(4) 运行结果


