一、问题描述
在本地运行一个MR程序进行自测,发现一直卡在 reduce-> copy 阶段,不能往下走。
二、解决历程
看到这个问题,想到:其他工程中的类都是可以,为什么这个工程不行,怀疑是pom包引的版本不对的问题。但是为了找到证据,开始看本地调用过程。一步一步debug看:
整个调用流程如下:
// MR main方法:
boolean success = job.waitForCompletion(true);
// Hadoop源码:
Job extend Thread:
- job.waitForCompletion
- submit();
JobSubmitter:
- submitter.submitJobInternal(Job.this, cluster);
- status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
LocalJobRunner:
- submitJob
- Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
- this.start(); // 由于Job类继承了Thread,因此直接查看Job的run方法
//以上是提交过程,以下是真正的run过程
Job.run
- TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); // 拆分源信息
- List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles); // 封装MapRunnable
- ExecutorService mapService = createMapExecutor(taskRunnables.size()); // 创建 Map 执行线程池
- for (Runnable r : taskRunnables) {
mapService.submit(r);
} // 提交任务
- mapService.shutdown();
- LOG.info("Map task executor complete.");
// 以上 map阶段完成,下面开始 Reduce 阶段
- TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
- ReduceTask reduce = new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(), 1); // 创建 reduce 任务
- Path reduceIn = localOutputFile.getInputFileForWrite(mapId.getTaskID(), localFs.getFileStatus(mapOut).getLen()); // 根据 Map out 创建 reduce in 目录
- reduce.run(localConf, this); // reduce run
ReduceTask
- run
- TaskReporter reporter = startReporter(umbilical); // 开启一个线程进行进度的汇报
- Class combinerClass = conf.getCombinerClass();
- CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; // 获取 Combine 类
- Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); // 创建 shuffle
- LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
- ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context( ... ) // 创建 shuffleContext 上下文
- Shuffle implement ShuffleConsumerPlugin
- shuffle.init // 在此处进行 shuffleConsumerPlugin 对象的创建
- shuffle.run
- final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch); // 获取 MapEvent
- eventFetcher.start();
- EventFetcher.run()
- getMapCompletionEvents
- Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; // 拉取 map 输出到本地目录
- for 循环, fetcher.start
- fetcher.run
- host = scheduler.getHost();
- host = scheduler.getHost(); // 调用 ShuffleSchedulerImpl 的getHost
- ShuffleSchedulerImpl.getHost
// 原因是因为pendingHosts一直为空,所以一直在wait,打印:reduce -> copy
- while(pendingHosts.isEmpty()) { wait(); }
- 内部类: Referee 给 pendingHosts 赋值
查到此,就需要往前回看,看pendingHosts为什么为空,在哪里赋值的
/*
思路:
1、看 pendingHosts 在哪里定义的
2、看 pendingHosts 都有哪里给赋值
3、看赋值的方法被谁调用了
/
1、pendingHosts的定义以及add方法如下图所示
2、分别看了下调用的地方,怀疑是最后一个地方,如图:
发现在ReduceTask.init的时候,进行 shuffleConsumerPlugin 对象的创建;发现这个host的赋值在 wait (reduce -> copy) 之后,所以出现了疑问;
就对比了另外一个项目中,跑的正常的MR,发现 shuffle.run 的时候,isLocal是true,但是出现问题时,isLocal是false;于是继续追这个 isLocal
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
回溯 localMapFiles 在哪里设置的,发现在有问题的MR中 setLocalMapFiles 方法没有调用,正常的MR中 setLocalMapFiles 是有调用的;
最后确定是版本问题,再通过引入正确的版本解决;