job提交流程(由客户端提交job)
说明:上传的job.jab就是自己自定义的jar包,用来在yarn实现MAP和Reduce
提交内容:
源码:
为什么要提交配置信息:job提交给yarn,生成appmaster,appmaster需要配置文件conf(job.xml)才能知道干什么
1.job.waitForCompletion(true) //开始提交Job
1.1 state == JobState.DEFINE //当前Job状态的判断,如果是runing状态则不提交(意思是当job在提交过程还没结束又再提交了一次是不行的)
1.2 submit(); //提交Job
1.2.1 setUseNewAPI(); //设置使用新的API
1.2.2建立连接
connect(); // 创建提交对象
// 1.2.2.1创建提交Job的代理
return new Cluster(getConfiguration());
// 1.2.2.1.1判断是本地yarn还是远程(如果是本地会创建LocalJobRunner对象,
如果是集群创建YarnRunner)
initialize(jobTrackAddr, conf);
1.2.3 提交job
submitter.submitJobInternal(Job.this, cluster)
//1.2.3.1 checkSpecs(job) //①检查是否设置了输出路径 ②输出路径是否存在
//1.2.3.2 创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//1.2.3.3 获取jobid ,并创建Job最终路径
JobID jobId = submitClient.getNewJobID();
//1.2.3.4 拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
//如果是本地不会上传jar包,如果是集群会上传jar包到HDFS上
rUploader.uploadFiles(job, jobSubmitDir);
//1.2.3.5计算切片,生成切片规划文件并写到对应的路径
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
//1.2.3.6向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
//1.2.3.7提交Job,返回提交状态(本地:submitClient对象是LocalJobRunner,
集群:submitClient对象是YarnRunner)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
作业提交全过程详解:
(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器(FIFO调度队列)中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
//以下是监控功能
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器(appmaster), 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。