(1)要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件(分区)
(2)默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户无法控制哪个key存储到哪个分区

自定义分区

1、自定义类继承Partitioner,重写getPartition()方法
2、在Job驱动中,设置自定义Partitioner:
job.setPartitionerClass(CustomerPartitioner.class)
3、根据自定义分区的逻辑设置相应数量的ReduceTask
job.setNumReduceTask(5)

分区总结

  1. 如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000XX
  2. 如果小于,会异常
  3. ReduceTask等于1,只会有一个结果文件

    案例

    将统计结果按照手机归属地不同省份输出到不同文件中
    (1)输入数据
    与1相同
    (2)期望输出
    手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
    (3)代码实现
    1. package com.BigData.MapReduce;
    2. import org.apache.hadoop.io.Text;
    3. import org.apache.hadoop.mapreduce.Partitioner;
    4. public class CustomerPartition extends Partitioner<Text,FlowBean> {
    5. @Override
    6. public int getPartition(Text key, FlowBean flow, int numPartitions) {
    7. String preNumber = key.toString().substring(0,3);
    8. if("136".equals(preNumber))
    9. return 0;
    10. else if("137".equals(preNumber))
    11. return 1;
    12. else if("138".equals(preNumber))
    13. return 2;
    14. else if("139".equals(preNumber))
    15. return 3;
    16. return 4;
    17. }
    18. }
    ```java package com.BigData.MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException; import java.net.URI;

public class MapReduceDemo { public static class MyMapper extends Mapper { protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException { String line = value.toString(); String[] data = line.split(“\t”); FlowBean flow = new FlowBean(); flow.setUpFlow(Long.parseLong(data[data.length-3])); flow.setDownFlow(Long.parseLong(data[data.length-2])); flow.setSumFlow(Long.parseLong(data[data.length-3])+Long.parseLong(data[data.length-2])); context.write(new Text(data[1]),flow); } } // =======分割线========= // shuffle 进行合并,分区,分组,排序。相同的k2的数据会被同一个reduce拉取。 // 第二部分,写Reduce阶段 public static class MyReduce extends Reducer { //同样是有reduce函数 @Override protected void reduce(Text k2, Iterable v2s, Reducer.Context context) throws IOException, InterruptedException { for(FlowBean fb : v2s) context.write(k2,fb); } }

  1. public static void main(String[] args) throws Exception{
  2. //设置配置参数
  3. Configuration conf = new Configuration();
  4. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
  5. if(fs.exists(new Path("/out")))
  6. fs.delete(new Path("/out"),true);
  7. //创建任务
  8. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
  9. Path input = new Path("/data/phone_data.txt");
  10. Path output = new Path("/out");
  11. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
  12. //指定jar文件
  13. job.setJarByClass(MapReduceDemo.class);
  14. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
  15. FileInputFormat.addInputPath(job,input);
  16. //指定map的类
  17. job.setMapperClass(MyMapper.class);
  18. //指定map输出的key和value的数据类型。
  19. job.setMapOutputKeyClass(Text.class);
  20. job.setMapOutputValueClass(FlowBean.class);
  21. job.setNumReduceTasks(5);
  22. job.setPartitionerClass(CustomerPartition.class);
  23. //指定reduce类以及输出数据类型。
  24. job.setReducerClass(MyReduce.class);
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(FlowBean.class);
  27. //指定输出路径hdfs
  28. FileOutputFormat.setOutputPath(job,output);
  29. //提交任务,如果是true,会返回任务执行的进度信息等。
  30. job.waitForCompletion(true);
  31. }

}

``` 注意在main函数中配置job属性
62 job.setNumReduceTasks(5);
63 job.setPartitionerClass(CustomerPartition.class);
(4) 运行结果
image.png
image.png
image.png