Job.waitForCompletion()
Job.submit()
jobClient.submitJobInternal()
jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())
完成作业提交
而YARN的作业提交procotol是ClientRMProtocol,提交MRv2作业时,首先会生成集群信息类cluster,里面有一个frameworkLoader内部变量会从配置文件中加载ClientProtocolProvider的实现类,这里?分别是LocalClientProtocolProvider和?YarnClientProtocolProvider?。Cluster类在initialize中,会遍历frameworkLoader,由ClientProtocolProvider来生成具体的ClientProtocol?,比如在YarnClientProtocolProvider中就会判断JobConf中的?mapreduce.framework.name是否为?yarn,如果是的话则会生成YARNRunner
YarnClientProtocolProvider的create方法:
- @Override
- public ClientProtocol create(Configuration conf) throws IOException {
- if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
- return new YARNRunner(conf);
- }
- return null;
- }
ClientProtocol目前有两个实现?YARNRunner?和LocalJobRunner,LocalJobRunner(mapreduce.framework.name为local?)主要是在本地执行mapreduce,可以方便对程序进行调试。YARNRunner是将作业提交到YARN上??。
YARNRunner初始化会和ResourceManager建立RPC链接(默认是8032端口),真正和RM通信的协议是ClientRMProtocol?,客户端和RM交互的所有操作都会通过YARNRunner的成员变量rmClient(?ClientRMProtocol?)提交出去,比如killApplication,
getNodeReports, getJobCounters等等
- public synchronized void start() {
- YarnRPC rpc = YarnRPC.create(getConfig());
- this.rmClient = (ClientRMProtocol) rpc.getProxy(
- ClientRMProtocol.class, rmAddress, getConfig());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
- }
- super.start();
- }
Cluster类初始化完成后,就要生成Application了,先和RM通信申请一个Application(getNewApplication?),得到一个GetNewApplicationResponse,里面封装了ApplicationID,和RM能提供的最小、最大Resource Capacity
- public interface GetNewApplicationResponse {
- public abstract ApplicationId getApplicationId();
- public Resource getMinimumResourceCapability();
- public Resource getMaximumResourceCapability();
- public void setMaximumResourceCapability(Resource capability);
- }
Resource定义了一组集群计算资源,目前只把memory和cpu纳入进来,这边的cpu指virtual core,也就是一个物理core可以被认为抽象成多个virtual
core,而非一对一对应关系
- public abstract class Resource implements Comparable<Resource> {
- public abstract int getMemory();
- public abstract void setMemory(int memory);
- public abstract int getVirtualCores();
- public abstract void setVirtualCores(int vCores);
- }
然后需要构造ApplicationSubmissionContext,其中包含了启动MR AM的信息,?比如提交的job在HDFS的staging目录路径(job.xml, ?job.split, job.splitmetainfo, libjars, files, archives等?),用户ugi信息,Secure
Tokens。完成context构造后,调用resMgrDelegate.submitApplication(appContext)?
YARNRunner的submitJob方法:
- @Override
- public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
- throws IOException, InterruptedException {
- // Construct necessary information to start the MR AM
- ApplicationSubmissionContext appContext =
- createApplicationSubmissionContext(conf, jobSubmitDir, ts);
-
- // Submit to ResourceManager
- ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
-
- ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
- String diagnostics = (appMaster == null ?
- "application report is null" : appMaster.getDiagnostics());
- if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
- || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
- throw new IOException("Failed to run job : " +
- diagnostics);
- }
- return clientCache.getClient(jobId).getJobStatus(jobId);
- }
最后通过getJobStatus方法获得Job状态信息?
- org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
- TypeConverter.toYarn(oldJobID);
- GetJobReportRequest request =
- recordFactory.newRecordInstance(GetJobReportRequest.class);
- request.setJobId(jobId);
- JobReport report = ((GetJobReportResponse) invoke("getJobReport",
- GetJobReportRequest.class, request)).getJobReport();