如:newFixedThreadPool
[java] view plain copy
public
static
ExecutorService newFixedThreadPool(int
nThreads) {return
new
ThreadPoolExecutor(nThreads, nThreads,- 0L, TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue());- }
如:newCachedThreadPool[java] view plain copy
public
static
ExecutorService newCachedThreadPool() {return
new
ThreadPoolExecutor(0, Integer.MAX_VALUE,- 60L, TimeUnit.SECONDS,
new
SynchronousQueue());- }
如:newSingleThreadExecutor[java] view plain copy
public
static
ExecutorService newSingleThreadExecutor() {return
new
FinalizableDelegatedExecutorService- (
new
ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS,
new
LinkedBlockingQueue()));- }
可以发现,其实它们调用的都是同一个接口ThreadPoolExecutor方法,只不过传入参数不一样而已。下面就来看看这个神秘的ThreadPoolExecutor。
首先来看看它的一些基本参数:
[java] view plain copy
public
class
ThreadPoolExecutor extends
AbstractExecutorService {- //运行状态标志位
volatile
int
runState;static
final
int
RUNNING = 0;static
final
int
SHUTDOWN = 1;static
final
int
STOP = 2;static
final
int
TERMINATED = 3;- //线程缓冲队列,当线程池线程运行超过一定线程时并满足一定的条件,待运行的线程会放入到这个队列
private
final
BlockingQueue workQueue;- //重入锁,更新核心线程池大小、最大线程池大小时要加锁
private
final
ReentrantLock mainLock = new
ReentrantLock();- //重入锁状态
private
final
Condition termination = mainLock.newCondition();- //工作都set集合
private
final
HashSet workers = new
HashSet();- //线程执行完成后在线程池中的缓存时间
private
volatile
long
keepAliveTime;- //核心线程池大小
private
volatile
int
corePoolSize;- //最大线程池大小
private
volatile
int
maximumPoolSize;- //当前线程池在运行线程大小
private
volatile
int
poolSize;- //当缓冲队列也放不下线程时的拒绝策略
private
volatile
RejectedExecutionHandler handler;- //线程工厂,用来创建线程
private
volatile
ThreadFactory threadFactory;- //用来记录线程池中曾经出现过的最大线程数
private
int
largestPoolSize;- //用来记录已经执行完毕的任务个数
private
long
completedTaskCount;- ................
- }
初始化线程池大小 有以下四种方法:
从源码中可以看到其实最终都是调用了以下的方法:
[java] view plain copy
public
ThreadPoolExecutor(int
corePoolSize,int
maximumPoolSize,long
keepAliveTime,- TimeUnit unit,
- BlockingQueue workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
if
(corePoolSize < 0="">- maximumPoolSize <= 0="">
- maximumPoolSize < corepoolsize="">
- keepAliveTime <>
throw
new
IllegalArgumentException();if
(workQueue == null
|| threadFactory == null
|| handler == null
)throw
new
NullPointerException();this
.corePoolSize = corePoolSize;this
.maximumPoolSize = maximumPoolSize;this
.workQueue = workQueue;this
.keepAliveTime = unit.toNanos(keepAliveTime);this
.threadFactory = threadFactory;this
.handler = handler;- }
这里很简单,就是设置一下各个参数,并校验参数是否正确,然后抛出对应的异常。接下来我们来看看最重要的方法execute,其源码如下:
[java] view plain copy
public
void
execute(Runnable command) {if
(command == null
)throw
new
NullPointerException();if
(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 判断1if
(runState == RUNNING && workQueue.offer(command)) { // 判断2if
(runState != RUNNING || poolSize == 0) // 判断3- ensureQueuedTaskHandled(command);
- }
else
if
(!addIfUnderMaximumPoolSize(command)) // 判断4- reject(command); // is shutdown or saturated
- }
- }
笔者在上面加了点注释。下面我们一个一个判断来看首先判断1
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
(1)当poolSize >= corePoolSize 不成立时,表明当前线程数小于核心线程数目,左边返回fasle.接着执行右边判断!addIfUnderCorePoolSize(command)
它做了如下操作
[java] view plain copy
private
boolean
addIfUnderCorePoolSize(Runnable firstTask) {- Thread t =
null
; final
ReentrantLock mainLock = this
.mainLock;//加锁- mainLock.lock();
try
{if
(poolSize < corepoolsize="" &&="" runstate="=">- t = addThread(firstTask);//加入线程
- }
finally
{ - mainLock.unlock();
- }
return
t != null
;- }
发现它又调用addTread[java] view plain copy
private
Thread addThread(Runnable firstTask) { //调用这个方法之前加锁- Worker w =
new
Worker(firstTask);//线程包装成一个work - Thread t = threadFactory.newThread(w);//线程工厂从work创建线程
boolean
workerStarted = false
;if
(t != null
) {if
(t.isAlive()) // 线程应该是未激活状态throw
new
IllegalThreadStateException();- w.thread = t;
- workers.add(w);//全局set添加一个work
int
nt = ++poolSize;//当前运行线程数目加1if
(nt > largestPoolSize)- largestPoolSize = nt;
try
{- t.start();//注意,这里线程执行了,但是其实真正调用的是Worker类的run方法!!!!!!!!!
- workerStarted =
true
; - }
finally
{if
(!workerStarted)- workers.remove(w);
- }
- }
return
t;- }
其实Work是真实去调用线程方法的地方,它是对Thread类的一个包装,每次Thread类调用其start方法时,就会调用到work的run方法。其代码如下,[java] view plain copy
private
void
runTask(Runnable task) { //真正发起线程方法的地方final
ReentrantLock runLock = this
.runLock;- runLock.lock();
try
{
[java] view plain copy
if
((runState >= STOP || //判断的判断- (Thread.interrupted() && runState >= STOP)) &&
- hasRun)
- thread.interrupt();
boolean
ran = false
;- beforeExecute(thread, task);//处理前
try
{- task.run();//执行真正的原始线程的run方法
- ran =
true
; - afterExecute(task,
null
);//处理后 - ++completedTasks;
- }
catch
(RuntimeException ex) { if
(!ran)- afterExecute(task, ex);
throw
ex;- }
- }
finally
{ - runLock.unlock();
- }
- }
- //这里执行线程的方法
public
void
run() {try
{- hasRun =
true
; - Runnable task = firstTask;
- firstTask =
null
; while
(task != null
|| (task = getTask()) != null
) {- runTask(task);
- task =
null
; - }
- }
finally
{ - workerDone(
this
); - }
- }
发现要执行一个线程真的很不容易,如果addIfUnderCorePoolSize返回true,刚表明成功添加一条线程,并调用了其start方法,那么整个调用到此结束。如果返回fasle.那么就进入判断2.(2)当poolSize >= corePoolSize成立时,整个判断返回true。接着执行判断2
判断2
[java] view plain copy
if
(runState == RUNNING && workQueue.offer(command)) { // 判断2
如果当前线程池在运行状态,并且将当前线程加入到缓冲队列中。workQueue的offer是一个非阻塞方法。如查缓冲队列满了的话,返回为false.否则返回true;如果上面两个都 为true,表明线程被成功添加到缓冲队列中,并且当前线程池在运行。进入判断3
判断3
[java] view plain copy
if
(runState != RUNNING || poolSize == 0)- ensureQueuedTaskHandled(command);
当线程被加入到线程池中,进入判断3.如果这时线程池没有在运行或者运行的线程为为0。那么就调用ensureQueuedTaskHandled,它做的其实是判断下是否在拒绝这个线程的执行。[java] view plain copy
private
void
ensureQueuedTaskHandled(Runnable command) {final
ReentrantLock mainLock = this
.mainLock;- mainLock.lock();
boolean
reject = false
;- Thread t =
null
; try
{int
state = runState;if
(state != RUNNING && workQueue.remove(command)) //线程池没有在运行,且缓冲队列中有这个线程- reject =
true
; else
if
(state < stop="">- poolSize < math.max(corepoolsize,="" 1)="">
- !workQueue.isEmpty())
- t = addThread(
null
); - }
finally
{ - mainLock.unlock();
- }
if
(reject)- reject(command); //根据拒绝策略处理线程
- }
判断4[java] view plain copy
else
if
(!addIfUnderMaximumPoolSize(command))- reject(command); // is shutdown or saturated
在判断2为false时执行,表明当前线程池没有在运行或者该线程加入缓冲队列中失败,那么就会尝试再启动下该线程,如果还是失败,那就根据拒绝策略来处理这个线程。其源码如下:[java] view plain copy
private
boolean
addIfUnderMaximumPoolSize(Runnable firstTask) {- Thread t =
null
; final
ReentrantLock mainLock = this
.mainLock;- mainLock.lock();
try
{if
(poolSize < maximumpoolsize="" &&="" runstate="=" running)="" 如果当前运行线程数目="" 小于最大线程池大小="" 并且="">- t = addThread(firstTask);
- }
finally
{ - mainLock.unlock();
- }
return
t != null
;- }
一般调用这个方法是发生在缓冲队列已满了,那么线程池会尝试直接启动该线程。当然,它要保存当前运行的poolSize一定要小于maximumPoolSize。否则,最后。还是会拒绝这个线程!以上大概就是整个线程池启动一条线程的整体过程。
总结:
ThreadPoolExecutor中,包含了一个任务缓存队列和若干个执行线程,任务缓存队列是一个大小固定的缓冲区队列,用来缓存待执行的任务,执行线程用来处理待执行的任务。每个待执行的任务,都必须实现Runnable接口,执行线程调用其run()方法,完成相应任务。
ThreadPoolExecutor对象初始化时,不创建任何执行线程,当有新任务进来时,才会创建执行线程。
构造ThreadPoolExecutor对象时,需要配置该对象的核心线程池大小和最大线程池大小:
当目前执行线程的总数小于核心线程大小时,所有新加入的任务,都在新线程中处理
当目前执行线程的总数大于或等于核心线程时,所有新加入的任务,都放入任务缓存队列中
当目前执行线程的总数大于或等于核心线程,并且缓存队列已满,同时此时线程总数小于线程池的最大大小,那么创建新线程,加入线程池中,协助处理新的任务。
当所有线程都在执行,线程池大小已经达到上限,并且缓存队列已满时,就rejectHandler拒绝新的任务