分享

MapReduce的yarn框架整理

 春和秋荣 2019-10-11

简介

对于节点超过4000的大型集群,使用原先的MR框架已经棉铃扩展性瓶颈。yarn将原先的Jobtracker的功能划分出来,分别实现。原先的Jobtracker负责作业涤涝妒和任务进度见识,追踪任务,重启失败或者过慢的任务和进度任务。
yarn将两种角色划分为两个独立的守护进程:管理集群资源上使用资源管理器和管理集群上运行任务生命周期的应用管理器。

基本思路是:应用管理器向资源管理器申请资源(也就是容器,每个容器都有特定内存上限,可配置),然后在容器上跑程序,再由应用管理器监控容器。
在原先的框架上,jobtracker只有一个,但是yarn框架下,给每个MR的job(实例)生成了master应用,用以申请资源和监控。

yarn比MR原先的框架更具有一般性,不同的yarn应用可以在集群上共存,也可以在一个yarn激情on个上运行多个不同版本的MR,使得MR的升级过程更容易管理。

基础配置(hadoop2.5.2)

配置MapReduce为yarn框架

可以参考http:///mapreduce-nextgen/hadoop-yarn-configurations-mapreduce/

参数默认值yarn设置所在文件说明
mapreduce.framework.namelocalyarnmapred-site.xml设置了MR的运算框架,默认local表示本地模式,yarn表示使用yarn框架
mapreduce.job.reduces1自定mapred-default.xml设置reduce数量
yarn.resourcemanager.store.classorg.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore自定设置资源管理器存储的模式

设置uber任务参数

参数默认值yarn设置所在文件说明
mapreduce.job.ubertask.enablefalse建议开启mapred-default.xml是否开启uber任务模式,就是是否在yarn模式下,一个jvm运行多个任务
mapreduce.job.ubertask.maxmaps9自定mapred-default.xml一个容器内跑的最大map数量
mapreduce.job.ubertask.maxreduces1自定mapred-default.xml一个容器内跑的最大reduce数量
mapreduce.job.ubertask.maxbytes
自定mapred-default.xml

设置ApplicationMaster分配任务内存参数

参数默认值yarn设置所在文件说明
yarn.scheduler.minimum-allocation-mb1024自定义yarn-default.xml容器的最小内存
yarn.scheduler.maximum-allocation-mb8192自定义yarn-default.xml容器的最大内存
yarn.nodemanager.vmem-pmem-ratio2.1自定义yarn-default.xml物理内存与虚拟内存比值,默认2.1,即为使用1G物理内存可以使用2.1G虚拟内存
yarn.nodemanager.resource.memory-mb8192自定义yarn-default.xml一个节点可分给容器的最大物理内存数量

设置进度参数

参数默认值yarn设置所在文件说明
mapreduce.client.progressmonitor.pollinterval1000自定mapred-default.xml客户端以多少毛秒的间隔,向ApplicationMaster查询一次作业进度

设置失败处理参数

参数默认值yarn设置所在文件说明
mapreduce.task.timeout600000自定义mapred-default.xml任务超时时间,单位毫秒,如果设置为0就永远不会触发超时判定
mapreduce.map.maxattempts4自定义mapred-default.xmlApplicationMaster会重启失败map任务的次数
mapreduce.reduce.maxattempts4自定义mapred-default.xmlApplicationMaster会重启失败reduce任务的次数
yarn.resourcemanager.am.max-attempts2自定义yarn-default.xmlApplicationMaster与资源管理器连接失败后重新尝试的次数
yarn.resourcemanager.recovery.enabledfalse自定义yarn-default.xml老ApplicationMaster发生故障后,新ApplicationMaster是否要恢复故障应用程序所运行任务的状态
yarn.am.liveness-monitor.expiry-interval-ms600000自定义yarn-default.xmlNodeManager超时时间,单位毫秒
mapreduce.job.maxtaskfailures.per.tracke3自定义mapred-default.xml节点任务失败次数,超过这个次数,该节点被拉黑

作业运行机制

yarn包含的实体:
1)提交MapReduce作业的客户端
2)yarn资源管理器(ResourceManager),负责协调集群上计算资源分配
3)yarn节点管理器(NodeManager),是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器(ResourceManager的一部分)汇报。
4)ApplicationMaster负责一个 Job 生命周期内的所有工作,向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。注意每个Job都有一个ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。注意是在容器中运行的。
5)容器:在nodeManager内部,可以理解为一个资源池,类似于jvm虚拟机, 包括本次任务需要的CPU, 内存等资源。会把程序的jar包也拷贝到容器内存中。

分布图:
这里写图片描述

流程图:
这里写图片描述

1.作业提交

主要包含了流程图中的1-4步骤,就是图中job实例运行的步骤
在main函数中运行中运行ToolRunner.run(new TestDriver, args);
TestDriver是自定义的,继承自类Configured和Tool,实现了run接口。

public class TestDriver extends Configured implements Tool{
public int run(String[] arg0) throws Exception {
    if (arg0.length != 2) {
        System.err.printf("Usage: %s [generic option] <input> <output>", getClass().getSimpleName());
        ToolRunner.printGenericCommandUsage(System.err);
        return -1;
    }
    Job job = new Job(getConf(), "Max element");
    job.setJarByClass(getClass());

    FileInputFormat.addInputPath(job, new Path(arg0[0]));
    FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

    job.setMapperClass(TestMapper.class);
    job.setCombinerClass(TestReducer.class);
    job.setReducerClass(TestReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //等待作业完成,然后输出结果
    return job.waitForCompletion(true) ? 0 :1;
}

在run函数的最后会调用job的waitForCompletion()方法,这个方法是提交作业中最重要的一个函数,整个提交作业都需要这个函数实现。

public boolean waitForCompletion(boolean verbose
                              ) throws IOException, InterruptedException,
                                        ClassNotFoundException {
if (state == JobState.DEFINE) {
  submit();//提交作业
}
if (verbose) {
  jobClient.monitorAndPrintJob(conf, info);
} else {
    info.waitForCompletion();//轮询查询作业进度
}
return isSuccessful();

}

waitForCompletion()方法中会调用job的submint()方法首先会去资源管理器申请一个作业id,然后创建一个内部的JobSummiter实例,然后调用submintJobInternal()方法提交作业。最后waitForCompletion()会阻塞客户端进程每秒轮询作业进度。成功显示作业计时器,失败把错误返回控制台。

class JobSubmitter {  
/*客户端提交作业给resourcemanager*/  
JobStatus submitJobInternal(Job job, Cluster cluster)   
    throws ClassNotFoundException, InterruptedException, IOException {  
    //客户端检查作业输出说明  
    checkSpecs(job);  
    //拷贝作业资源到HDFS  
    Configuration conf = job.getConfiguration();  
    addMRFrameworkToDistributedCache(conf);  
    copyAndConfigureFiles(job, submitJobDir);  
    // 计算输入分片Create the splits for the job  
    int maps = writeSplits(job, submitJobDir);  
    conf.setInt(MRJobConfig.NUM_MAPS, maps);  
    // job资源文件拷贝当前作业目录  
    writeConf(conf, submitJobFile);  
    //ResourceManager上的submitAppliction()方法  
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());  
}     

}

步骤1

MR程序在客户端run job新建一个job实例,会在建立实例的时候带入config相关参数(执行jar文件的时候设置的-config命令,优先级更高)。然后设置MR指定相关类,最后调用waitForCompletion(),这个函数会完成剩下的提交作业的步骤。

步骤2

客户端向资源管理器(ResourceManager)申请一个新的作业id。在submint()方法中实现的。

步骤3

这一步主要是提交作业前的准备工作。在submitJobInternal()方法中实现
1)检查作业输出,如果没有指定输出目录或者输出目录已经存在,作业就不提交,错误抛回MR程序。
2)计算作业的输入分片。如果无法计算,比如输入路径不存在,作业就不提交。
3)将作业资源(包括jar,配置信息和分片信息)复制到HDFS上面。

步骤4

客户端调用submintApplication()方法资源管理器发送消息,提交作业提交。资源管理器将job放入对应的job队列中。

2.作业初始化

作业初始化主要负责MR程序在各个点运行之前的工作。完成了流程图中的5-7步骤。

步骤5

NodeManager从ResourceManager队列中领取任务。

步骤6

ResourceManager根据job和NodeManager情况, 计算出资源大小,然后在NodeManager上创建容器。

步骤7

ResourceManager在一台机器上(一般在NodeManager)创建ApplicationMaster节点。ApplicationMaster是一个java应用程序,主类是MRAppMaster。这个jar程序是运行在容器内的,生命周期为该job的运行时间,一旦作业完成,就会像其他容器那样被ResourceManager回收。
ApplicationMaster会对作业初始化:
1)创建多个对象对作业进度进行监控。
2)到之前存放作业资源的HDFS中获取对应的分片信息确定map数量和信息,并根据mapreduce.job.reduces属性确定reduce的数量。
3)根据得到的资源信息,ApplicationMaster会决定构成MR作业的各个任务要如何组成。如果作业很小就放在一个容器中运行(这一点是MR原来的框架所无法做到的),这种作业一般成为uber任务。

3.任务分配

主要是由ApplicationMaster在各个节点上分配容器运行map和reduce任务。

步骤8

ApplicationMaster为了map、reduce任务向ResourceManager容器。然后容器会在各个节点上运行对应的MR任务,如果没有设置uber模式,那么ApplicationMaster会为每一个MR任务申请容器。
在默认情况下map和reduce任务分配的内存是默认的1024MB,可以通过配置参数来设置任务内存数。

相关参数:
yarn.scheduler.minimum-allocation-mb
yarn.scheduler.maximum-allocation-mb
yarn.nodemanager.vmem-pmem-ratio
yarn.nodemanager.resource.memory.mb

Map Memory
mapreduce.map.java.opts
mapreduce.map.memory.mb
Reduce Memory

Reduce Memory
mapreduce.reduce.java.opts
mapreduce.reduce.memory.mb

这部分参数配置信息可以参考http://www.jianshu.com/p/e8d93817f547(比较重要的)

4.任务执行

步骤9

ApplicationMaster通过与NodeManager通信来启动容器。在任务运行前,首先会将任务需要的资源本地化,包括作业的配置,jar文件和所有来自分布式缓存的文件,最后运行MR任务。

步骤10

Map/Reduce任务完成, 然后向ResourceManager注销MRAppMaster进程。到此为止任务执行完毕。

5.进度和状态更新

MR在容器中跑的任务,通过umbilical接口向ApplicationMaster汇报进度和状态(包含计时器)。同事客户端每秒钟(通过mapreduce.client.progressmonitor.pollinterval设置)向ApplicationMaster查询一次进度。

6.作业完成

客户端除了向ApplicationMaster查询进度之外,每5秒钟通过调用waitForComletion()检查作业是否完成(可以通过mapreduce.client.completion.pollinterval设置)。之后ApplicationMaster会清理工作状态,并保存作业信息。

失败处理

这部分主要讲,yarn框架中,MR程序在各个阶段失败所做的处理。顺序和讲流程的顺序相反,从下往上讲起。这部分主要注意一些任务失败的参数设置,可能会影响计算效率。

1. 容器内任务运行失败

一般任务失败分为两种情况:
1)容器内运行的map或者reduce任务失败后,会将异常和退出反馈给ApplicationMaster。
2)ApplicationMaster长时间没有收到任务的进度回报,就会注意到挂起的任务,任务后标为失败(超时时间mapreduce.task.timeout)。

这里要注意的是,ApplicationMaster会重启失败任务。如果一个任务失败超过4次就不会再重试了(次数由参数mapreduce.map.maxattempts和mapreduce.reduce.maxattempts决定)
有时候并不希望几个任务失败就结束整个作业,这时候还可以设置失败任务最大百分比,
mapred.max.map.failures.percent和mapred.max.map.failures.percent(这两个参数没在官网找到)

2.ApplicationMaster运行失败

分为两种情况
1)资源管理器和ApplicationMaster通信失败后,会进行多次尝试,多次失败后,ApplicationMaster会被标记为失败。yarn.resourcemanager.am.max-attempts参数设置。
2)ApplicationMaster会向资源管理器发送周期行心跳,如果心跳信息超时,那么就会被标记为失败。

ApplicationMaster被资源管理器判定为失败之后,资源管理器会在一个新的容器内开始一个新的ApplicationMaster,新的ApplicationMaster在默认情况下会重新运行所有的任务。不过可以通过设置yarn.resourcemanager.recovery.enabled为true,使得新的ApplicationMaster恢复故障应用程序运行任务的状态。
客户端会向ApplicationMaster定时询问进度,一旦新的ApplicationMaster产生,客户端会和资源管理器询问新的ApplicationMaster的地址。

3.NodeManager运行失败

NodeManager也会定时向资源管理器发送心跳信息。一点超时就会诶资源管理器溢出可用节点资源管理器池。超时时间yarn.am.liveness-monitor.expiry-interval-ms参数决定。
如果一个节点上,应用程序失败次数过高,节点可能会被管理器拉黑。由ApplicationMaster管理黑名单,失败次数mapreduce.job.maxtaskfailures.per.tracke参数设定。

4.资源管理器失败

资源管理器失败是比较严重的问题,需要通过程序员手动重启。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多