集群成员代理
yarnrunner
localjobrunner
Job提交流程image.png

Job源码详解

  1. waitForCompletion()
  2. submit();
  3. // 1建立连接
  4. connect();
  5. // 1)创建提交Job的代理
  6. new Cluster(getConfiguration());
  7. //(1)判断是本地yarn还是远程
  8. initialize(jobTrackAddr, conf);
  9. // 2 提交job
  10. submitter.submitJobInternal(Job.this, cluster)
  11. // 1)创建给集群提交数据的Stag路径
  12. Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  13. // 2)获取jobid ,并创建Job路径
  14. JobID jobId = submitClient.getNewJobID();
  15. // 3)拷贝jar包到集群(本地不提交)
  16. copyAndConfigureFiles(job, submitJobDir);
  17. rUploader.uploadFiles(job, jobSubmitDir);
  18. // 4)计算切片,生成切片规划文件
  19. writeSplits(job, submitJobDir);
  20. maps = writeNewSplits(job, jobSubmitDir);
  21. input.getSplits(job);
  22. // 5)向Stag路径写XML配置文件
  23. writeConf(conf, submitJobFile);
  24. conf.writeXml(out);
  25. // 6)提交Job,返回提交状态
  26. status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());