(1)要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件(分区)
(2)默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户无法控制哪个key存储到哪个分区
自定义分区
1、自定义类继承Partitioner,重写getPartition()方法
2、在Job驱动中,设置自定义Partitioner:
job.setPartitionerClass(CustomerPartitioner.class)
3、根据自定义分区的逻辑设置相应数量的ReduceTask
job.setNumReduceTask(5)
分区总结
- 如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000XX
- 如果小于,会异常
- ReduceTask等于1,只会有一个结果文件
案例
将统计结果按照手机归属地不同省份输出到不同文件中
(1)输入数据
与1相同
(2)期望输出
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
(3)代码实现
```java package com.BigData.MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;package com.BigData.MapReduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomerPartition extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text key, FlowBean flow, int numPartitions) {
String preNumber = key.toString().substring(0,3);
if("136".equals(preNumber))
return 0;
else if("137".equals(preNumber))
return 1;
else if("138".equals(preNumber))
return 2;
else if("139".equals(preNumber))
return 3;
return 4;
}
}
import java.io.IOException; import java.net.URI;
public class MapReduceDemo {
public static class MyMapper extends Mapper
public static void main(String[] args) throws Exception{
//设置配置参数
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
if(fs.exists(new Path("/out")))
fs.delete(new Path("/out"),true);
//创建任务
conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
Path input = new Path("/data/phone_data.txt");
Path output = new Path("/out");
Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
//指定jar文件
job.setJarByClass(MapReduceDemo.class);
//指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
FileInputFormat.addInputPath(job,input);
//指定map的类
job.setMapperClass(MyMapper.class);
//指定map输出的key和value的数据类型。
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(CustomerPartition.class);
//指定reduce类以及输出数据类型。
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定输出路径hdfs
FileOutputFormat.setOutputPath(job,output);
//提交任务,如果是true,会返回任务执行的进度信息等。
job.waitForCompletion(true);
}
}
```
注意在main函数中配置job属性
62 job.setNumReduceTasks(5);
63 job.setPartitionerClass(CustomerPartition.class);
(4) 运行结果