分享

客户端MapReduce提交到YARN过程(一)

 看风景D人 2016-10-11

在Mapreduce v1中是使用JobClient来和JobTracker交互完成Job的提交,用户先创建一个Job,通过JobConf设置好参数,通过JobClient提交并监控Job的进展,在JobClient中有一个内部成员变量JobSubmissionProtocol,JobTracker实现了该接口,通过该协议客户端和JobTracker通信完成作业的提交
  1. public void init(JobConf conf) throws IOException {  
  2.   String tracker = conf.get("mapred.job.tracker", "local");  
  3.   tasklogtimeout = conf.getInt(  
  4.     TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);  
  5.   this.ugi = UserGroupInformation.getCurrentUser();  
  6.   //如果mapred.job.tracker设置成local,则创建本地LocalJobRunner,否则创建RPC代理  
  7.   if ("local".equals(tracker)) {  
  8.     conf.setNumMapTasks(1);  
  9.     this.jobSubmitClient = new LocalJobRunner(conf);  
  10.   } else {  
  11.     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
  12.   }          
  13. }  
按顺序调用: 
Job.waitForCompletion()
Job.submit()
jobClient.submitJobInternal()
jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 
完成作业提交

而YARN的作业提交procotol是ClientRMProtocol提交MRv2作业时,首先会生成集群信息类cluster,里面有一个frameworkLoader内部变量会从配置文件中加载ClientProtocolProvider的实现类,这里?分别是LocalClientProtocolProvider和?YarnClientProtocolProvider?。Cluster类在initialize中,会遍历frameworkLoader,由ClientProtocolProvider来生成具体的ClientProtocol?,比如在YarnClientProtocolProvider中就会判断JobConf中的?mapreduce.framework.name是否为?yarn,如果是的话则会生成YARNRunner
YarnClientProtocolProvider的create方法:
  1. @Override  
  2. public ClientProtocol create(Configuration conf) throws IOException {  
  3.   if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {  
  4.     return new YARNRunner(conf);  
  5.   }  
  6.   return null;  
  7. }  

ClientProtocol目前有两个实现?YARNRunner?和LocalJobRunner,LocalJobRunner(mapreduce.framework.name为local?)主要是在本地执行mapreduce,可以方便对程序进行调试。YARNRunner是将作业提交到YARN上??。
YARNRunner初始化会和ResourceManager建立RPC链接(默认是8032端口),真正和RM通信的协议是ClientRMProtocol?,客户端和RM交互的所有操作都会通过YARNRunner的成员变量rmClient(?ClientRMProtocol?)提交出去,比如killApplication, getNodeReports, getJobCounters等等
  1. public synchronized void start() {  
  2.   YarnRPC rpc = YarnRPC.create(getConfig());  
  3.   this.rmClient = (ClientRMProtocol) rpc.getProxy(  
  4.       ClientRMProtocol.class, rmAddress, getConfig());  
  5.   if (LOG.isDebugEnabled()) {  
  6.     LOG.debug("Connecting to ResourceManager at " + rmAddress);  
  7.   }  
  8.   super.start();  
  9. }  

Cluster类初始化完成后,就要生成Application了,先和RM通信申请一个Application(getNewApplication?),得到一个GetNewApplicationResponse,里面封装了ApplicationID,和RM能提供的最小、最大Resource Capacity
  1. public interface GetNewApplicationResponse {  
  2.   public abstract ApplicationId getApplicationId();  
  3.   public Resource getMinimumResourceCapability();  
  4.   public Resource getMaximumResourceCapability();  
  5.   public void setMaximumResourceCapability(Resource capability);   
  6. }  
Resource定义了一组集群计算资源,目前只把memory和cpu纳入进来,这边的cpu指virtual core,也就是一个物理core可以被认为抽象成多个virtual core,而非一对一对应关系
  1. public abstract class Resource implements Comparable<Resource> {  
  2.   public abstract int getMemory();  
  3.   public abstract void setMemory(int memory);  
  4.   public abstract int getVirtualCores();  
  5.   public abstract void setVirtualCores(int vCores);  
  6. }  

然后需要构造ApplicationSubmissionContext,其中包含了启动MR AM的信息,?比如提交的job在HDFS的staging目录路径(job.xml, ?job.split, job.splitmetainfo, libjars, files, archives等?),用户ugi信息,Secure Tokens。完成context构造后,调用resMgrDelegate.submitApplication(appContext)?
YARNRunner的submitJob方法:
  1. @Override  
  2. public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)  
  3. throws IOException, InterruptedException {  
  4.   // Construct necessary information to start the MR AM  
  5.   ApplicationSubmissionContext appContext =  
  6.     createApplicationSubmissionContext(conf, jobSubmitDir, ts);  
  7.   
  8.   // Submit to ResourceManager  
  9.   ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);  
  10.   
  11.   ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);  
  12.   String diagnostics = (appMaster == null ?  
  13.           "application report is null" : appMaster.getDiagnostics());  
  14.   if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED  
  15.       || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {  
  16.     throw new IOException("Failed to run job : " +  
  17.       diagnostics);  
  18.   }  
  19.   return clientCache.getClient(jobId).getJobStatus(jobId);  
  20. }  

最后通过getJobStatus方法获得Job状态信息?
  1. org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =  
  2.   TypeConverter.toYarn(oldJobID);  
  3. GetJobReportRequest request =  
  4.     recordFactory.newRecordInstance(GetJobReportRequest.class);  
  5. request.setJobId(jobId);  
  6. JobReport report = ((GetJobReportResponse) invoke("getJobReport",  
  7.     GetJobReportRequest.class, request)).getJobReport();  

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多