在上一篇我们的一个案例中,我们对出现单词的各个统计进行了mapreduce的入门学习,其中重要的两个步骤就是对map阶段和reduce阶段进行了代码的编写。但是在shuffle阶段使用了默认的分区、排序、规约、分组。在我们工作当中应该更多的是将不同的分区交给不同的reduce,从而产生不同的结果。
在MapReduce中,通过我们指定的分区,会将同一个分区的数据发送到同一个reduce当中去处理。例如:为了数据的统计,可以把一批类似的数据发送到同一个reduce当中,在同一个reduce当中统计相同类型的数据,就可以实现类似的数据分区和统计等。其实就死相同的类型数据,有共性的数据送到一起出来,reduce当中默认的分区只有一个。

单词统计任务拓展

还是跟上篇案例类似,假设有个a.txt文件300M,在单词统计的基础上要求对单词长度>=5放到一个结果集,<5放到另外一个结果集。此时的reduceTask就不是下图这个了,而是存在两个reduceTask,也就是说同一个分区的数据会放到同一个reduce,同一个reduce会产生一个文件
image.pngimage.png

代码实现

新增PartitionerOwn类

要实现分区必须要重写Partitioner,然后自定义我们的分区规则

  1. package com.dongnaoedu.network.hadoop.mapreduce.partition;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class PartitionerOwn extends Partitioner<Text, LongWritable>{
  6. /**
  7. * text:表示k2
  8. * longWritable:表示v2
  9. * i:reduce个数
  10. */
  11. @Override
  12. public int getPartition(Text text,LongWritable longWritable,int i) {
  13. // 如果单词的长度>=5进入第一个分区==>第一个reduceTask==>reduce编号是0
  14. if(text.toString().length() >= 5){
  15. return 0;
  16. }else{
  17. // 如果长度<5,进入第二个分区==>第二个reduceTask==>reduce编号是1
  18. return 1;
  19. }
  20. }
  21. }

主方法设置分区代码

  1. package com.dongnaoedu.network.hadoop.mapreduce.partition;
  2. import com.dongnaoedu.network.hadoop.mapreduce.WordCountMapper;
  3. import com.dongnaoedu.network.hadoop.mapreduce.WordCountReduce;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  12. import org.apache.hadoop.util.Tool;
  13. import org.apache.hadoop.util.ToolRunner;
  14. public class JobMain extends Configured implements Tool{
  15. @Override
  16. public int run(String[] strings) throws Exception{
  17. Job job = Job.getInstance(super.getConf(), "mapReduce_wordCount");
  18. //打包放在集群下运行,需要做一个配置
  19. job.setJarByClass(JobMain.class);
  20. // 第一步:设置读取文件的类:k1 v1
  21. job.setInputFormatClass(TextInputFormat.class);
  22. TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));
  23. // 第二步:设置mapper类
  24. job.setMapperClass(WordCountMapper.class);
  25. // 设置map阶段输出类型
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(LongWritable.class);
  28. // 第三 四 五 六步采用默认方式(分区 排序 规约 分组)
  29. // 设置我们的分区类
  30. job.setPartitionerClass(PartitionerOwn.class);
  31. // 第七步:设置Reduce类
  32. job.setReducerClass(WordCountReduce.class);
  33. job.setOutputKeyClass(Text.class);
  34. job.setOutputValueClass(LongWritable.class);
  35. // 设置reduce的个数j
  36. job.setNumReduceTasks(2);
  37. // 第八步:设置输出类
  38. job.setOutputFormatClass(TextOutputFormat.class);
  39. // 设置输出路径
  40. TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));
  41. boolean b = job.waitForCompletion(true);
  42. return b ? 0:1;
  43. }
  44. public static void main(String[] args) throws Exception{
  45. Configuration configuration = new Configuration();
  46. // 启动一个任务 run=0成功
  47. int run = ToolRunner.run(configuration, new JobMain(), args);
  48. System.exit(run);
  49. }
  50. }

运行结果

查看我们的hdfs结果,会发现有两个文件,编号为0的就是我们长度>=5(第一个分区),编号为1的就是我们长度<5的(第二个分区)
image.png