知识点连接第一天 如有疑惑移步我的首页

我的首页

补充知识点

2 partition分区 - 图1
Shuffle 主要包括 四个步骤 (分区 排序 规约 分组)

Partition 分区

分区书写步骤

分区流程

2 partition分区 - 图2

创建分区类

  1. public static class MyPartition extends Partitioner<Text, IntWritable>{
  2. /**
  3. * 用于创建 分区规则
  4. * 继承 org.apache.hadoop.mapreduce.Partitioner;
  5. * @param text : map阶段之后 reduce阶段之前的k
  6. * @param intWritable : map阶段之后 reduce阶段之前的v
  7. * @param numPartitions
  8. * @return 返回int类型
  9. */
  10. /**
  11. * 重写getPartition方法
  12. */
  13. @Override
  14. public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
  15. // 指定规则 例如 如果k是hadoop 那么 返回0 就是0分区 否则 1分区
  16. if(text.toString().equals("hadoop")){
  17. return 0;
  18. }else{
  19. return 1;
  20. }
  21. }
  22. }

让分区类能够被运用 配置

  1. // 这个配置设置谁是分区类
  2. job.setPartitionerClass(MyPartition.class);
  3. // 设置分区reduce个数 你返回了几个变量分区数就是几 比如我0,1 就是两个分区
  4. job.setNumReduceTasks(2);

完整代码 不同项目

  1. package MapReduce_one;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Partitioner;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. public class JobMainTwo {
  15. // main
  16. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  17. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.7.1");
  18. Configuration configuration = new Configuration();
  19. Job job = Job.getInstance(configuration, "text");
  20. job.setPartitionerClass(JobMainTwo.MyPartition.class);
  21. // 设置Reduce个数
  22. job.setReducerClass(JobMainTwo.ReduceP.class);
  23. job.setOutputKeyClass(Text.class);
  24. job.setOutputValueClass(NullWritable.class);
  25. job.setNumReduceTasks(2);
  26. job.setJarByClass(JobMainTwo.class);
  27. job.setMapperClass(JobMainTwo.MapP.class);
  28. job.setMapOutputValueClass(NullWritable.class);
  29. job.setMapOutputKeyClass(Text.class);
  30. FileInputFormat.addInputPath(job,new Path("/user/root/baoxian1117.csv"));
  31. FileOutputFormat.setOutputPath(job,new Path("/user/root/result3"));
  32. System.exit(job.waitForCompletion(true)?0:1);
  33. }
  34. // map
  35. public static class MapP extends Mapper<LongWritable, Text, Text, NullWritable>{
  36. @Override
  37. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  38. context.write(value, NullWritable.get());
  39. }
  40. }
  41. // partition
  42. public static class MyPartition extends Partitioner<Text, NullWritable>{
  43. @Override
  44. public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
  45. String[] split = text.toString().split(",");
  46. if (Integer.parseInt(split[0])>200){
  47. return 0;
  48. }else {
  49. return 1;
  50. }
  51. }
  52. }
  53. // reduce
  54. public static class ReduceP extends Reducer<Text, NullWritable, Text, NullWritable>{
  55. @Override
  56. protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  57. context.write(key, NullWritable.get());
  58. }
  59. }
  60. }

问题和注意事项

  1. 在本地运行配置得环境可能不支持分区操作 如果可以当然不可以 如果不可以 可以尝试先选择去yarn集群跑一下任务看是否能够正常分区
  2. 注意设置分区数目