分享

CapacityScheduler调度算法

 taohongyong 2014-06-03

本文描述了Hadoop中的Capacity Scheduler的实现算法,Capacity Scheduler是由Yahoo贡献的,主要是解决HADOOP-3421中提出的,在调度器上完成HOD(Hadoop On Demand)功能,克服已有HOD的性能低效的缺点。它适合于多用户共享集群的环境的调度器。本文解析的计算能力调度器属于Hadoop 0.21.0。

一、计算能力调度器介绍

Capacity Scheduler支持以下特性:

  • (1) 计算能力保证。支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业共享该队列中的资源。
  • (2) 灵活性。空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源,便会分配给他们。
  • (3) 支持优先级。队列支持作业优先级调度(默认是FIFO)。
  • (4) 多重租赁。综合考虑多种约束防止单个作业、用户或者队列独占队列或者集群中的资源。
  • (5) 基于资源的调度。 支持资源密集型作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。

二、Capacity Scheduler概述

Capacity Scheduling是由雅虎提出的作业调度算法,它提供了类似于Fair Scheduling算法的功能,但是在设计与实现上两者在很多方面存在着差别。

在Capacity Scheduling算法中,可以定义多个作业队列。当作业被提交时,它将被直接放入到一个队列中。每个队列都可以通过配置获得一定数量的TaskTracker资源用于处理Map操作和Reduce操作。调度算法将按照配置文件为队列分配相应的计算资源量。同Fair Scheduling算法近似,为了提高资源利用率,对于已经被分配但是尚处于空闲状态的资源,各个队列为分享它们。当没有能够按照设定数值获得足够资源量的队列中增加了作业压力时,之前那些曾经分配给它但又被别的队列所占用的资源会在完成当前任务后立即配给回它应属的队列。由此可以看出,Capacity Scheduling算法的思路是为各个队列中的作业模拟出具有指定计算能力的独立的Hadoop集群资源,而不像Fair Scheduling算法那样试图在所有作业之间实现公平的资源分享。

Capacity Scheduling算法在每个队列中采用的调度策略是带有优先级的FIFO算法,优先级高的作业可以在优先级低的作业之前访问队列资源。Capacity Scheduling算法不支持优先级抢占,一旦一个作业开始执行,那么在执行完成之前它所使用的资源是不会被有更高优先级的作业夺走的。另外,同属于同一用户的作业不能出现独占资源的情况,为了达到这一目的,Capacity Scheduling算法对队列中同一用户提交的作业能够获得的资源百分比进行了强制限定。

在Capacity Scheduling算法的具体实现中,最关键的也是如何挑选合适的作业去执行。当系统中出现了空闲的TaskTracker,算法会首先选择一个具有最多空闲空间的队列,这是通过计算队列中正在运行的任务数与其分得的计算资源之间的比值是否最低来判断的。一旦一个队列被选中,调度算法就会按照作业的优先级和提交时间顺序进行选择。在选择作业的时候,还需要关注作业所属的用户是否已经超出了他所能使用的资源限制,如果是的话那么相关的作业都不能被选中。

此外,Capacity Scheduling算法还能够有效地对Hadoop集群的内存资源进行管理,以支持内存密集型应用。如果一个作业内存资源需求较高,那么调度算法就要保证将该作业的相关任务指派到具有充足内存资源的TaskTracker上执行,以避免任务由于内存资源不足而无法执行。因此,在作业选择的过程中,Capacity Scheduling算法还需要检查空闲TaskTracker上的内存资源是否能够满足作业的内存需求。TaskTracker上的空闲资源量的数值可以通过TaskTracker的内存资源总量减去当前已经使用的内存偏移量得到,而后者是包含在TaskTracker向JobTracker发送的周期性心跳信息中。

三、Capacity Scheduler调度算法源码解析

下面对Capacity Scheduler调度算法的源码进行详细的拆解,Capacity Scheduler的整个类图如图3.1。

图3.1 Capacity Scheduler调度算法的整个类图

3.1 TaskSchedulingMgr类

3.1.1 TaskSchedulingMgr类的成员和方法

TaskSchedulingMgr是管理任务调度的抽象类,包括如下成员:

protected CapacityTaskScheduler scheduler;
protected TaskType type = null;

其中,scheduler是调度器对象,在构造函数中传入;type代表任务类型,在TaskSchedulingMgr的实现类MapSchedulingMgr和ReduceSchedulingMgr中分别取值为TaskType.Map和TaskType.Reduce。

TaskSchedulingMgr提供了如下抽象方法:

obtainNewTask(TaskTrackerStatus taskTracker,JobInProgress job);
getClusterCapacity();
getTSC(QueueSchedulingContext qsc);
hasSpeculativeTask(JobInProgress job,TaskTrackerStatus tts);

抽象方法到它的实现类去讨论。

TaskSchedulingMgr的其它普通方法:

getOrderedQueues()

返回根据队列比较器排好序的队列,主要用来调试。

getOrderedJobQueues()

获取排好序的作业队列。

isUserOverLimit(JobInProgress j,QueueSchedulingContext qsc)

通过调用本地的getTSC()方法,获得TaskSchedulingContext类型的变量,提取TaskSchedulingContext中的参数,比较被占用的槽数是否小于实际可用的槽数,如是,则currentCapacity=实际可用的槽数;否则,currentCapacity=被占用的槽数+TaskDataView.getTaskDataView(type).getSlotsPerTask(j)。

TaskDataView类中的getTaskDataView()方法,该方法根据Task的类型是Map还是Reduce,分别获得一个TaskDataView类型的变量,并返回,然后通过job调用getSlotsPerTask()方法。

int limit = Math.max((int) (Math.ceil((double) currentCapacity/ (double) qsc.getNumJobsByUser().size())),(int) (Math.ceil((double) (qsc.getUlMin() * currentCapacity) / 100.0)));

用户的ID:String user = j.getProfile().getUser();

如果被该用户占用的槽数>= limit,写入日志,返回True。

getTaskFromQueue(TaskTracker taskTracker,QueueSchedulingContext qsi)

这个方法从一个队列中取出一个task,返回的TaskLookupResult对象包含:

private LookUpStatus lookUpStatus;
private Task task;

其中task才是关键信息,当然,task可能为空。具体的实现逻辑如图3.2。

  • 在队列中的选一个job,检查是否是RUNNING状态的;如果不是,重复1),否则继续;
  • 检查队列是否超出最大slot限制,或者用户的slot占有量是否超过限定,如果是则回到1),否则继续;
  • 检查tasktracker是否能满足作业的内存需求,如果满足则从作业中挑选一个任务,如代码:
  • Task t = obtainNewTask(taskTrackerStatus, j);
  • obtainNewTask()函数负责从job中未运行的任务挑选一个,该函数的具体细节容稍后讨论。 如果没能从job中找到适合的任务,则回到1),否则成功,返回结果,结束。

图3.2 从队列中挑选task

如果TaskTracker的内存不能满足作业的内存需求,那么调度器会判断该作业是否有待运行的任务或者保留的TaskTracker是否足够。如果是则回到(1),否则,将该tasktracker保留,退出函数。这里有一个机制:

CapacitySecheduler在调度过程中考虑作业的内存需求,但是,当TaskTracker内存无法满足Job的内存需求时,系统不会把它直接放弃该TaskTracker,而是将它保留给该作业,也就是将该作业的TaskTracker的slot登记到Job的名下,这样做是为了使该作业不至于饿死。

assignTasks(TaskTracker taskTracker)

为TaskTracker分配task,分两种情况,如图3.3。

图3.3 为TaskTracker分配task

第一种情况,看TaskTracker是否已经被预定了:

JobInProgress job = taskTracker.getJobForFallowSlot(type);

如果job不为空,这表示TaskTracker的slot被job预定了,这时做以下工作:

如果availableSlots大于等于每个task所需的slot数,那么TaskTracker释放预留的slot,并从job中挑选一个任务在TaskTracker上运行,退出。否则,重新预留TaskTracker的slot给job,并返回内存匹配失败的TaskLoogupResult对象。

第二种情况,如果TaskTracker没有被预留,那么,从备选的队列集合中寻找适合的队列,并从队列中挑选适合的task,选出来的task必须是三种类型中的一种:TASK_FOUND,NO_TASK_FOUND或者TASK_FAILING_MEMORY_REQUIREMENT(挑选的任务内存需求得不到满足)。挑选过程的执行代码是:

TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();

如果以上两种都不是,则换一个队列,继续找。如果最终还没有找到,返回任务查找失败。

printQSCs()

打印作业队列信息,用于调试。

hasSpeculativeTask(TaskInProgress[] tips, TaskTrackerStatus tts)

检查TaskTracker是否已经被某个作业预留了。

3.1.2 TaskSchedulingMgr的内部类

1) 内部类QueueComparator

类QueueComparator实现了接口Comparator,对AbstractQueue对象进行排序,排序算法定义了compare()方法,比较的就是AbstractQueue对象。首先调用AbstractQueue对象中的getQueueSchedulingContext()方法,返回的类型为QueueSchedulingContext,然后调用了本地方法getTSC(),返回的是TaskSchedulingContext。

调用TaskSchedulingContext中的getCapacity方法,判断返回的值是否是0,若为0则值为1.0f,否则为:(double) t1.getNumSlotsOccupied() / (double) t1.getCapacity()

2) 内部类MapQueueComparator

该类继承类上面的类QueueComparator,只定义了一个方法,传入的类型是QueueSchedulingContext,通过调用该类中的getMapTSC()方法,返回的类型是TaskSchedulingContext。

3) 内部类ReduceQueueComparator

该类同样继承了QueueComparator,也只定义了一个方法,通过调用QueueSchedulingContext类中的getReduceTSC()方法,返回的类型是TaskSchedulingContext。

4) MapSchedulingMgr/ReduceSchedulingMgr

两个类都继承自TaskSchedulingMgr,实现了抽象方法。

A) 类MapSchedulingMgr

我们先讨论抽象方法在类MapSchedulingMgr中的实现。

obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)

这个函数是针对TaskTracker,从job中挑选一个task。

下面几个函数都是从简单的读取相应的变量,比较简单所以不讨论。

getClusterCapacity()读取集群的容量,即slot数

getTSC(QueueSchedulingContext qsi) 返回TaskSchedulingContext对象

hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) 作业在taskTracker上是否有预留任务。

B) 类ReduceSchedulingMgr

它的实现与MapSchedulingMgr一样,代码片段如下:

obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)ClusterStatus clusterStatus = scheduler.taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
return job.obtainNewReduceTask(taskTracker, numTaskTrackers,scheduler.taskTrackerManager.getNumberOfUniqueHosts());

3.2 CapacityTaskScheduler类

CapacityTaskScheduler类是计算能力调度算法的主类,它主要实现了计算能力的算法,类图如图3.4。

图3.4 CapacityTaskScheduler类图

3.2.1 核心成员变量

(1) TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this)

Map任务的调度器

(2) TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this)

Reduce任务的调度器

(3) MemoryMatcher memoryMatcher = new MemoryMatcher();

用与内存匹配

(4) JobQueuesManager jobQueuesManager

队列管理

3.2.2 CapacityTaskScheduler类实现了TaskScheduler类的方法  

CapacityTaskScheduler类实现了TaskScheduler类,实现核心方法为:

(1) start()方法

这是MapReduce调度器的入口,在JobTracker的开始位置就是调用此方法,函数offerService()中有一句 taskScheduler.start(); taskScheduler是通过反射完成实例化

代码如下:

Class<? extends TaskScheduler> schedulerClass=conf.getClass("mapred.jobtracker.taskScheduler",JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler =  (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);

回到正题,start()的主要任务是:初始化配置信息,初始化队列,添加作业监听器(JobQueueManager)。start()方法的执行流程如图3.5。

图3.5 start()方法的执行流程    start()方法的执行步骤:

步骤一、初始化内存相关的信息,主要是初始化如下四个参数memSizeForMapSlotOnJT:每个map槽使用的内存大小;

  • memSizeForReduceSlotOnJT:每个reduce槽使用的内存大小;
  • limitMaxMemForMapTasks:map任务的最大内存限制;
  • limitMaxMemForReduceTasks:reduce任务的最大内存限制。

步骤二、从配置文件map-site.xml读取队列信息;

步骤三、从配置设置capacity-scheduler.xml初始化队列;

步骤四、完整性检查:确保应至少有一个队列;

步骤五、创建一个新的queue-hierarchy builder和尝试加载完整的层次结构的队列;

步骤六、创建JobQueue对象。 通知JobQueuesManager ,以便它可以跟踪running/waiting作业。 JobQueuesManager仍然是不作为监听器添加到JobTracker ,所以没有必要同步。

步骤七、由于创建/更改了QueueSchedulingContext对象。把它们放到队列信息中,以确保用户界面能够即时查看 。

步骤八、队列已经准备完成。现在注册jobQueuesManager与JobTracker ,以监听作业变动;

步骤九、开启初始化作业的线程,见JobInitializationPoller类的方法;

步骤十、start()方法执行完毕。

(2) terminate()方法

终止调度器,步骤如下:

步骤一、判断作业调度器是否启动, 如启动则执行步骤二;

步骤二、判断作业队列管理类是否为空,如果不为空,则清空作业队列;

步骤三、终止初始化作业的线程。

步骤四、terminate()方法执行完毕。

(3) assignTasks(TaskTracker taskTracker)方法

声明如下:

public synchronized List<Task> assignTasks(TaskTracker taskTracker)

用一句话讲就是:通过一定的算法为给定的TaskTracker分配计算任务,返回结果就是计算任务集合,一般情况下是一个Map任务和一个Reduce任务。

那么对于CapacityScheduler来说采用的算法策略是:当某个tasktracker上出现空闲slot时,调度器依次选择一个queue、(选中的queue中的)job、(选中的job中的)task,并将该slot分配给该task,如图3.6。这些过程已经在上面的TaskScheduling及其实现类的相关方法中讨论过了。

图3.6 计算能力调度算法策略

(4) getJobs(String queueName)

声明:

public synchronized Collection<JobInProgress> getJobs(String queueName)

根据队列名获取队列中的作业集合。

3.2.3 CapacityTaskScheduler类实现了TaskScheduler类的内部类QueueRefresher

这是一个重要的内部类,该类所实现的方法只有具有管理员权限的用户可以操作,由你们提出的作业队列的动态配置能否实现全在于该类是如何设计的,下面我们先来看看CapacityScheduler作业调度器是如何实现该内部类的。

CapacityScheduler作业调度器重写了该内部类的refreshQueues()方法,refreshQueues()方法的执行步骤如图3.7。

图3.7 refreshQueues()方法的执行步骤

步骤一、判断作业调度器是否已经启动,如果未启动,则不能刷新队列;

步骤二、跟start()方法一样,它也调用了initializeQueues() 方法;

步骤三、判断根队列是否为空,若为空,则不能初始化根队列为空的队列;

步骤四、完整性检查:确保应至少有一个队列;

步骤五、创建一个新的queue-hierarchy builder和尝试加载完整的层次结构的队列;

步骤六、创建根队列newRootAbstractQueue;

步骤七、以newRootAbstractQueue为根队列,创建一个完整的层次结构(下面的步骤是如何创建完整的层次结构);

步骤八、检查子队列是否有更多的子队列,若有则遍历子队列;若无,则执行步骤十二;

步骤九、检查当前被遍历到的子队列是否还有更多的子队列。若有,则生成一个新的ContainerQueue,然后递归创建层次结构;

步骤十、更新totalCapacity;

步骤十一、递归创建子层次结构;

步骤十二、如果这不是一个JobQueue,创建一个JobQueue(加载了配置文件中的一些有关队列的信息);

步骤十三、检查每个层次的totalCapacity,对子队列的totalCapacity不应超过100 ;

步骤十四、向层次结构提交更改请求前,请对调度器加锁;

步骤十五、复制前做一些验证,如果队列的是否支持优先级改变了,则抛出异常,当前调度器还未支持是否支持优先级的改变;

步骤十六、首先递归更新子队列;

步骤十七、现在,复制根队列本身的配置;

步骤十八、调用JobInitializationPoller类的refreshQueueInfo()方法——刷新与初始化缓存有关的调度配置。缓存的配置,目前只能由主线程在初始化中使用。因此,主线程的迭代自动检查任何更新。

步骤十九、refreshQueues()方法执行完毕。

对于步骤十五,改变当前队列是否支持优先级这一点,我们是这样理解的,队列中的作业是存储在一个Map数组中的,Map数组的比较器是可以改变的,但是改变了之后,由于Map之间是直接复制的,即使改变了该方案,Map的排列方式并未发生改变,且如果突然改变了是否支持优先级这一点。如果有新的作业提交,Map的队列会造成混乱(如果有新的成果该处会及时更新)。

3.3 MemoryMatcher类

3.3.1 MemoryMatcher类的成员变量

类包含的主要成员变量如下:

memSizeForMapSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;

对应于mapreduce.cluster.mapmemory.mb 设置的值,JobTracker上的Map槽的内存大小。

memSizeForReduceSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT;

对应于mapreduce.cluster.reducememory.mb设置的值,JobTracker上的Reduce槽的内存大小。

limitMaxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT;

Map任务的最大内存限制。

limitMaxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT;

Reduce任务的最大内存限制。

3.3.2 MemoryMatcher类的方法

getMemReservedForTasks(TaskTrackerStatus taskTracker,TaskType taskType)

获取给定的TaskTracker的内存使用情况。计算方法是依据TaskTracker的上报信息(TaskStatus)来计算。 matchesMemoryRequirements(JobInProgress job, TaskType taskType,TaskTrackerStatus taskTracker) 这是MemoryMatcher的核心函数,主要判断TaskTracker是否能满足指定Job的内存需求。

isSchedulingBasedOnMemEnabled

调度算法是否支持内存的调度。

3.4 JobInitializationPoller类

关于作业的初始化,初始化的逻辑是由一个主线程和几个工作线程构成,其中,主线程周期性的从scheduler中“拖”一部分作业,把这些作业赋给工作线程,让工作线程去完成。“拖”选作业的原则是作业被调度的可能性较大,作业被调度的可能性主要:用户的作业限制,队列的容量限制。同时,高优先级的作业总是被优先初始化。

作业的初始化工作需要占用JobTracker的内存,因此有必要限定每个队列的初始化作业的数量。

3.4.1 JobInitializationPoller类的成员变量

类包含的主要成员变量如下:

  • (1) private JobQueuesManager jobQueueManager;作业管理器
  • (2) private long sleepInterval;线程间隔时间
  • (3) private int poolSize;初始化规模数
  • (4) private HashMap<JobID, JobInProgress> initializedJobs;已经初始化的作业集合
  • (5) private volatile boolean running; 正在运行与否
  • (6) private TaskTrackerManager ttm;
  • (7) private HashMap<String, JobInitializationThread> threadsToQueueMap;队列与为之服务的工作线程的映射关系。

在讨论函数前,先把它的内部类搞定。

3.4.2 内部类

类JobInitializationThread

类JobInitializationThread的成员变量

  • (1) private JobInProgress initializingJob;正在初始化的作业
  • (2) private volatile boolean startIniting;是否开始初始化
  • (3) private AtomicInteger currentJobCount = new AtomicInteger(0); 初始化作业数,它是一个原子类型。

下面从执行流程来分析JobInitializationThread。

首先肯定是run()函数,它只调用了initializeJobs(),这个函数的主要工作是:

循环执行:获取每个队列的首个任务进行初始化,初始化工作由TastTrackerManager的initJob()函数完成。 这就是JobInitializationThread的核心。其它函数比较简单,在此不讨论了。

3.4.3 JobInitializationPoller类的方法

(1) init(Set<String> queues, CapacitySchedulerConf capacityConf)

初始化函数,相当重要,它直接被CapacityTaskScheduler的start()函数调用,如图3.8。

图3.8 初始化函数执行流程

  • 首先,确定两个变量sleepInterval(线程执行的睡眠时间),poolSize(取初始化线程数(由配置文件的”mapred.capacity-scheduler.init-worker-threads”决定,默认值是5)和队列大小中的较小值)的大小。
  • 其次,为队列分配初始化线程(工作线程),见函数assignThreadsToQueues()。
  • 最后,启动所有工作线程。

(2) assignThreadsToQueues()

服务于每个队列的初始化线程的分配工作,由这个函数搞定。 首先创建poolSize个线程,每个线程服务${队列数/poolSize}个队列。当然,这样分配会发现有些队列没有分配线程,因此,再按照roundrobin算法(从poolSize个线程中挑选)为队列分配线程。

(3) run()

循环:

  • 1.清除已经初始化的作业列表;
  • 2.选择即将初始化的作业(调用selectJobsToInitialize()),如图3.9;

图3.9 选择即将初始化的作业

selectJobsToInitialize(),主要干活的是接下来的函数getJobsToInitialize(String):选择作业的核心是这个函数,参数是队列名称。循环迭代队列的每一个作业:检查作业是否已经被初始化;检查队列最大初始化作业数的限制是否满足;用户限制是否满足;确认该作业还没有被kill掉。最后,将作业加入初始化队列。

四、作业队列管理

CapacityScheduler作业调度器定义了一个抽象的父类AbstractQueue来管理队列,同时用继承AbstractQueue的两个子类来管理队列的详细信息,最后用一个JobQueuesManager来管理所有队列。图5.1为CapacityScheduler的作业队列管理类。

图4.1 CapacityScheduler的作业队列管理类

4.1 QueueSchedulingContext类

CapacityScheduler调度器定义了一个基类来管理队列的信息QueueSchedulingContext,该基类中主要包含了如下信息

  • queueName——队列的名称;
  • mapCapacity—— 队列的map任务在集群中的最大能力;
  • reduceCapacity——队列的reduce任务在集群中的最大能力;
  • capacityPercent——集群中在此队列中对作业有效的槽数量的百分比;
  • maxCapacityPercent——它的作用是当集群中有未在配置文件中指定能力百分比的队列时,这些队列会根据剩下的能力百分比是否能为它们分配一个百分比而设定的值;例如,在配置文件中我们为一个队列设定了能力百分比为70%,同时还有两个队列未在配置文件中指定能力百分比,则剩下的30%/未配置的队列数,此处为15%,将15%跟maxCapacityPercent比较,若它的值大于15%,则将剩下的30%的能力指定给某一个队列;
  • numJobsByUser——队列中拥有作业的用户数;
  • ulMin——用户数限制的最小值;
  • supportsPriorities ——队列是否支持优先级;
  • numOfWaitingJobs ——等待作业的数量;
  • prevMapCapacity ——MapCapacity的状态;
  • prevReduceCapacity ——ReduceCapacity的状态;
  • mapTSC——TaskSchedulingContext对象;
  • reduceTSC——TaskSchedulingContext对象。
  • TaskSchedulingContext是为了分别针对不同任务类型而设计的,它包含了如下信息:
  • capacity——实际能力,这取决于集群中还有多少槽是可用的;
  • numRunningTasks——正在运行的task数;
  • numSlotsOccupied——被占用的槽数;
  • maxCapacity——实际可扩展的能力,这取决于集群中还有多少槽是可用的;
  • numSlotsOccupiedByUser——用户占用的槽数。

4.2 AbstractQueue类

AbstractQueue类是一个抽象类,它是队列层次的父类,所有队列继承这个类。

即使所有的队列类继承这个类,也只定义了2种类别的队列。一种是ContainerQueue类,即复合队列;另一种是JobQueue,即分级队列。通常情况下,ContainerQueue由JobQueue组成。JobQueue包括实际的作业列表,即runningJob,WaitingJob等。这样做是为了确保所有与作业相关的数据是在一个地方。

4.2.1 AbstractQueue类的方法

  • (1) update(int mapClusterCapacity, int reduceClusterCapacity)
  • 此处其实就是调用QueueSchedulingContext类中的updateContext()方法,来更新QueueSchedulingContext类中的成员变量。
  • (2) getDescendentJobQueues()
  • (3) getDescendantContainerQueues();
  • (4) sort(Comparator queueComparator)
  • (5) getChildren()
  • (6) addChild(AbstractQueue queue)
  • (7) distributeUnConfiguredCapacity()
  • (2)~(7)均为抽象方法,到实现类中再讨论。
  • (8) validateAndCopyQueueContexts(AbstractQueue sourceQueue) 该方法主要是验证且复制队列信息。在复制前,先对队列进行验证,主要是为了提示用户新队列是否支持优先级。更新时,首先递归更新子队列。然后,复制根队列本身的配置。

4.2.2 内部类

AbstractQueueComparator

构造比较器,比较队列的名称。

4.3 JobQueue类

JobQueue就是维持一个队列的基本信息,包括了如下信息:

  • comparator——队列的比较器,主要就是是否支持优先级;若不支持优先级,则根据作业的提交时间先后来对队列进行排序,如果作业的提交时间是一样的,则根据分配给作业的ID排序;
  • waitingJobs——队列中处于等待状态的作业数;
  • runningJobs——队列中处于运行状态的作业数;

(1) JobQueue类的主要方法:

update()方法:此方法是更新当前队列

QueueSchedulingContext和TaskSchedulingContext中处于RUNNING状态job的信息。主要做工作的是updateStatsOnRunningJob() 方法,updateStatsOnRunningJob() 方法首先判断作业的状态是否是RUNNING状态的,如果不是,则结束该方法;若是,则继续执行。

更新numRunningTasks和numSlotsOccupied和numSlotsOccupiedByUser。

(2) getWaitingJobs():获取正在等待的作业数。

Collections.unmodifiableCollection(new LinkedList(waitingJobs.values()));

(3) getRunningJobs():获取处于RUNNING状态的作业列表。

  • Collections.unmodifiableCollection(new LinkedList<JobInProgress>(runningJobs.values()))
  • addRunningJob(JobInProgress job):添加处于RUNNING状态的作业。
  • runningJobs.put(new JobSchedulingInfo(job), job);
  • removeRunningJob(JobSchedulingInfo jobInfo):移除处于RUNNING状态的作业。
  • runningJobs.remove(jobInfo)
  • removeWaitingJob(JobSchedulingInfo schedInfo):移除处于WAITING状态的作业。
  • addWaitingJob(JobInProgress job):添加处于WAITING状态的作业。
  • waitingJobs.put(new JobSchedulingInfo(job), job);
  • getWaitingJobCount():获取处于WAITING状态的作业数。
  • waitingJobs.size();
  • jobAdded(JobInProgress job):在队列中加入一个作业。它首先加入处于WAITING状态的Map函数中,然后获取该作业所属用户在队列中的作业数。如果之前没有该用户的作业,则设置该用户所属的作业数为1。然后,计算运行一个单一的map/reduce任务所需要的槽数。
  • jobCompleted(JobInProgress job):当一个作业完成的时候,更新相关参数,比如该作业所属于的用户在队列中占有的作业数等等。
  • reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo):当作业的优先级或者提交时间改变时,重新排序队列。
  • jobUpdated(JobChangeEvent event):如果作业的状态改变了,首先如果作业的优先级或者提交时间改变了,则重新排序队列。否则如果作业的运行状态改变了,如果状态变为SUCCEEDED或者FAILED或者KILLED,执行jobCompleted(job, oldJobStateInfo);方法;否则,如果状态变为RUNNING,则执行addRunningJob(job)方法。

4.3 ContainerQueue类

继承AbstractQueue,队列结构的复合类。

private List<AbstractQueue> children;

存储子队列的数组;

ContainerQueue类实现父类AbstractQueue的如下方法:

updateChildrenContext()

为子队列设置正常的能力值,然后更新子队列。

该方法遍历存储子队列的数组,为每个子队列更新计算能力信息,同时更新对象TaskSchedulingContext中的信息。

4.4 队列管理类JobQueuesManager

它利用一个Map函数jobQueues维护JobQueue对象,即维护一个队列。提供了如下方法:

addQueue(JobQueue queue)
jobQueues.put(queue.getName(), queue);

根据队列名称,增加一个队列。

jobAdded(JobInProgress job)

向某一个队列加入一个作业,它是根据作业所属队列名称而加入指定的队列。

JobQueue qi = getJobQueue(job.getProfile().getQueueName());
qi.jobAdded(job);
jobUpdated(JobChangeEvent event)

根据作业的改变状态,更新作业所在的队列。

getComparator(String queue)

获取队列的调度规则,即本队列是否支持优先级。

getJobQueue(JobInProgress jip)

根据队列名称获取队列。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多