前面提到ExecutorService是对Executor的扩展,增加了submit()等提交和调用任务的方法,也增加了对Executor生命周期的定义。本篇将结合shutdown()、shutdownNow()、awaitTermination()和tryTerminate()几个方法的实现对ThreadPoolExecutor生命周期相关的逻辑做分析整理。
0. ExecutorService的生命周期方法和ThreadPoolExecutor的生命周期实现
ExecutorService中,和生命周期相关的,声明了5个方法:
- awaitTermination() 阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束
- isShutdown() 通过ctl属性判断当前的状态是否不是RUNNING状态
- isTerminated() 通过ctl属性判断当前的状态是否为TERMINATED状态
- shutdown() 关闭Executor,不再接受提交任务
- shutdownNow() 关闭Executor,不再接受提交任务,并且不再执行入队列中的任务
前面的文章也已经介绍过,在ThreadPoolExecutor的实现中,定义了五个生命周期状态,标识了整个线程池对象所处的状态阶段,用来实现生命周期相关的方法。
1. ThreadPoolExecutor的shutdown()
我们先来看看shutdown()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 | public void shutdown() {
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
|
其实逻辑比较简单,尝试将状态切换到SHUTDOWN,这样就不会再接收新的任务提交。对空闲线程进行中断调用。最后检查线程池线程是否为0,并尝试切换到TERMINATED状态。
2. ThreadPoolExecutor的shutdownNow()
再来看看shutdownNow()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
|
主要所做的事情就是切换ThreadPoolExecutor到STOP状态,中断所有worker,并将任务队列中的任务取出来,不再执行。最后尝试修改状态到TERMINATED。
3. shutdown()和shutdownNow()的区别
shutdown()新的任务不会再被提交到线程池,但之前的都会依旧执行,通过中断方式停止空闲的(根据没有获取锁来确定)线程。
shutdownNow()则向所有正在执行的线程发出中断信号以尝试终止线程,并将工作队列中的任务以列表方式的结果返回。
两者区别:
- 是一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态
- 并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程
- 还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行
相同的是都在最后尝试将线程池推到TERMINATED状态。
4. ThreadPoolExecutor的awaitTermination()
阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public boolean awaitTermination( long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true ;
if (nanos <= 0 )
return false ;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
|
实际所做的就是Condition的定时await调用。用于状态依赖的线程阻塞。
5. tryTerminate()
tryTerminate()的意义就在于尝试进入终止状态,当ctl中worker数字为0时执行terminated()方法,否则等锁中断一个空闲的Worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return ;
if (workerCountOf(c) != 0 ) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return ;
}
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0 ));
termination.signalAll();
}
return ;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
|
其中interruptIdleWorkers()方法这里就不列代码了,空闲的worker主要是通过worker的tryLock()来确认的,因为执行任务的worker互斥地锁定对象。
中断worker导致线程退出,最终还会循环尝试终止其它的空闲线程,直到整个ThreadPoolExecutor最后终结。
6. ThreadPoolExecutor生命周期的扩展点
在生命周期上,ThreadPoolExecutor为扩展的类提供了一些扩展点,这是很好的设计,对扩展开放。
其中声明了如下protected的方法:
- beforeExecute() 在每个任务执行前做的处理
- afterExecute() 在每个任务执行后做的处理
- terminated() 在ThreadPoolExecutor到达TERMINATED状态前所做的处理
- finalize() 有默认实现,直接调用shutdown(),以保证线程池对象回收
- onShutdown() 在shutdown()方法执行到最后时调用,在java.util.concurrent.ScheduledThreadPoolExecutor类实现中用到了这个扩展点,做一些任务队列的清理操作。
下篇Java并发的文章会再介绍线程池饱和时的丢弃处理,更多关于ThreadPoolExecutor的分析和扩展还望大家整理指教。
|