一、问题描述
在本地运行一个MR程序进行自测,发现一直卡在 reduce-> copy 阶段,不能往下走。
二、解决历程
看到这个问题,想到:其他工程中的类都是可以,为什么这个工程不行,怀疑是pom包引的版本不对的问题。但是为了找到证据,开始看本地调用过程。一步一步debug看:
整个调用流程如下:
// MR main方法:boolean success = job.waitForCompletion(true);// Hadoop源码:Job extend Thread:- job.waitForCompletion- submit();JobSubmitter:- submitter.submitJobInternal(Job.this, cluster);- status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());LocalJobRunner:- submitJob- Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);- this.start(); // 由于Job类继承了Thread,因此直接查看Job的run方法//以上是提交过程,以下是真正的run过程Job.run- TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); // 拆分源信息- List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles); // 封装MapRunnable- ExecutorService mapService = createMapExecutor(taskRunnables.size()); // 创建 Map 执行线程池- for (Runnable r : taskRunnables) {mapService.submit(r);} // 提交任务- mapService.shutdown();- LOG.info("Map task executor complete.");// 以上 map阶段完成,下面开始 Reduce 阶段- TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);- ReduceTask reduce = new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(), 1); // 创建 reduce 任务- Path reduceIn = localOutputFile.getInputFileForWrite(mapId.getTaskID(), localFs.getFileStatus(mapOut).getLen()); // 根据 Map out 创建 reduce in 目录- reduce.run(localConf, this); // reduce runReduceTask- run- TaskReporter reporter = startReporter(umbilical); // 开启一个线程进行进度的汇报- Class combinerClass = conf.getCombinerClass();- CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; // 获取 Combine 类- Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); // 创建 shuffle- LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);- ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context( ... ) // 创建 shuffleContext 上下文- Shuffle implement ShuffleConsumerPlugin- shuffle.init // 在此处进行 shuffleConsumerPlugin 对象的创建- shuffle.run- final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch); // 获取 MapEvent- eventFetcher.start();- EventFetcher.run()- getMapCompletionEvents- Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; // 拉取 map 输出到本地目录- for 循环, fetcher.start- fetcher.run- host = scheduler.getHost();- host = scheduler.getHost(); // 调用 ShuffleSchedulerImpl 的getHost- ShuffleSchedulerImpl.getHost// 原因是因为pendingHosts一直为空,所以一直在wait,打印:reduce -> copy- while(pendingHosts.isEmpty()) { wait(); }- 内部类: Referee 给 pendingHosts 赋值
查到此,就需要往前回看,看pendingHosts为什么为空,在哪里赋值的
/*
思路:
1、看 pendingHosts 在哪里定义的
2、看 pendingHosts 都有哪里给赋值
3、看赋值的方法被谁调用了
/
1、pendingHosts的定义以及add方法如下图所示
2、分别看了下调用的地方,怀疑是最后一个地方,如图:

发现在ReduceTask.init的时候,进行 shuffleConsumerPlugin 对象的创建;发现这个host的赋值在 wait (reduce -> copy) 之后,所以出现了疑问;
就对比了另外一个项目中,跑的正常的MR,发现 shuffle.run 的时候,isLocal是true,但是出现问题时,isLocal是false;于是继续追这个 isLocal
boolean isLocal = localMapFiles != null;final int numFetchers = isLocal ? 1 :jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];if (isLocal) {fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,merger, reporter, metrics, this, reduceTask.getShuffleSecret(),localMapFiles);fetchers[0].start();} else {for (int i=0; i < numFetchers; ++i) {fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,reporter, metrics, this,reduceTask.getShuffleSecret());fetchers[i].start();}}
回溯 localMapFiles 在哪里设置的,发现在有问题的MR中 setLocalMapFiles 方法没有调用,正常的MR中 setLocalMapFiles 是有调用的;
最后确定是版本问题,再通过引入正确的版本解决;
