一、问题描述

在本地运行一个MR程序进行自测,发现一直卡在 reduce-> copy 阶段,不能往下走。
image.png

二、解决历程

看到这个问题,想到:其他工程中的类都是可以,为什么这个工程不行,怀疑是pom包引的版本不对的问题。但是为了找到证据,开始看本地调用过程。一步一步debug看:

整个调用流程如下:

  1. // MR main方法:
  2. boolean success = job.waitForCompletion(true);
  3. // Hadoop源码:
  4. Job extend Thread:
  5. - job.waitForCompletion
  6. - submit();
  7. JobSubmitter:
  8. - submitter.submitJobInternal(Job.this, cluster);
  9. - status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
  10. LocalJobRunner:
  11. - submitJob
  12. - Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
  13. - this.start(); // 由于Job类继承了Thread,因此直接查看Job的run方法
  14. //以上是提交过程,以下是真正的run过程
  15. Job.run
  16. - TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); // 拆分源信息
  17. - List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles); // 封装MapRunnable
  18. - ExecutorService mapService = createMapExecutor(taskRunnables.size()); // 创建 Map 执行线程池
  19. - for (Runnable r : taskRunnables) {
  20. mapService.submit(r);
  21. } // 提交任务
  22. - mapService.shutdown();
  23. - LOG.info("Map task executor complete.");
  24. // 以上 map阶段完成,下面开始 Reduce 阶段
  25. - TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
  26. - ReduceTask reduce = new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(), 1); // 创建 reduce 任务
  27. - Path reduceIn = localOutputFile.getInputFileForWrite(mapId.getTaskID(), localFs.getFileStatus(mapOut).getLen()); // 根据 Map out 创建 reduce in 目录
  28. - reduce.run(localConf, this); // reduce run
  29. ReduceTask
  30. - run
  31. - TaskReporter reporter = startReporter(umbilical); // 开启一个线程进行进度的汇报
  32. - Class combinerClass = conf.getCombinerClass();
  33. - CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; // 获取 Combine 类
  34. - Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); // 创建 shuffle
  35. - LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
  36. - ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context( ... ) // 创建 shuffleContext 上下文
  37. - Shuffle implement ShuffleConsumerPlugin
  38. - shuffle.init // 在此处进行 shuffleConsumerPlugin 对象的创建
  39. - shuffle.run
  40. - final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch); // 获取 MapEvent
  41. - eventFetcher.start();
  42. - EventFetcher.run()
  43. - getMapCompletionEvents
  44. - Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; // 拉取 map 输出到本地目录
  45. - for 循环, fetcher.start
  46. - fetcher.run
  47. - host = scheduler.getHost();
  48. - host = scheduler.getHost(); // 调用 ShuffleSchedulerImpl 的getHost
  49. - ShuffleSchedulerImpl.getHost
  50. // 原因是因为pendingHosts一直为空,所以一直在wait,打印:reduce -> copy
  51. - while(pendingHosts.isEmpty()) { wait(); }
  52. - 内部类: Referee pendingHosts 赋值

查到此,就需要往前回看,看pendingHosts为什么为空,在哪里赋值的
/*
思路:
1、看 pendingHosts 在哪里定义的
2、看 pendingHosts 都有哪里给赋值
3、看赋值的方法被谁调用了
/
1、pendingHosts的定义以及add方法如下图所示
image.png
2、分别看了下调用的地方,怀疑是最后一个地方,如图:
image.png
image.png
发现在ReduceTask.init的时候,进行 shuffleConsumerPlugin 对象的创建;发现这个host的赋值在 wait (reduce -> copy) 之后,所以出现了疑问;

就对比了另外一个项目中,跑的正常的MR,发现 shuffle.run 的时候,isLocal是true,但是出现问题时,isLocal是false;于是继续追这个 isLocal

  1. boolean isLocal = localMapFiles != null;
  2. final int numFetchers = isLocal ? 1 :
  3. jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  4. Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  5. if (isLocal) {
  6. fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
  7. merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
  8. localMapFiles);
  9. fetchers[0].start();
  10. } else {
  11. for (int i=0; i < numFetchers; ++i) {
  12. fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
  13. reporter, metrics, this,
  14. reduceTask.getShuffleSecret());
  15. fetchers[i].start();
  16. }
  17. }

回溯 localMapFiles 在哪里设置的,发现在有问题的MR中 setLocalMapFiles 方法没有调用,正常的MR中 setLocalMapFiles 是有调用的;
最后确定是版本问题,再通过引入正确的版本解决;