原文出处:http:/// 『chenssy』 作为Executor框架中最核心的类,ThreadPoolExecutor代表着鼎鼎大名的线程池,它给了我们足够的理由来弄清楚它。 下面我们就通过源码来一步一步弄清楚它。 内部状态 线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。 变量ctl定义为AtomicInteger ,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示”线程池状态”,低29位表示”线程池中的任务数量”。 RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。 SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。 STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。 TERMINATED:线程池彻底终止的状态。 各个状态的转换如下: 创建线程池 我们可以通过ThreadPoolExecutor构造函数来创建一个线程池: 共有七个参数,每个参数含义如下: corePoolSize 线程池中核心线程的数量。当提交一个任务时,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。 maximumPoolSize 线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了。 keepAliveTime 线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。 unit keepAliveTime的单位。TimeUnit workQueue 用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。我们可以选择如下几种:
threadFactory 用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),如下: public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } 返回的是DefaultThreadFactory对象,源码如下: ThreadFactory的左右就是提供创建线程的功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。 handler RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。 线程池提供了四种拒绝策略:
当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。 线程池 Executor框架提供了三种线程池,他们都可以通过工具类Executors来创建。 FixedThreadPool FixedThreadPool,可重用固定线程数的线程池,其定义如下: corePoolSize 和 maximumPoolSize都设置为创建FixedThreadPool时指定的参数nThreads,意味着当线程池满时且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略,该线程池不会再新建线程来执行任务,而是直接走拒绝策略。FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。 keepAliveTime设置为0L,表示空闲的线程会立刻终止。 workQueue则是使用LinkedBlockingQueue,但是没有设置范围,那么则是最大值(Integer.MAX_VALUE),这基本就相当于一个无界队列了。使用该“无界队列”则会带来哪些影响呢?当线程池中的线程数量等于corePoolSize 时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,当阻塞队列也满了之后,则线程池会新建线程执行任务直到maximumPoolSize。由于FixedThreadPool使用的是“无界队列”LinkedBlockingQueue,那么maximumPoolSize参数无效,同时指定的拒绝策略AbortPolicy也将无效。而且该线程池也不会拒绝提交的任务,如果客户端提交任务的速度快于任务的执行,那么keepAliveTime也是一个无效参数。 其运行图如下(参考《Java并发编程的艺术》): SingleThreadExecutor SingleThreadExecutor是使用单个worker线程的Executor,定义如下: 作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。 CachedThreadPool CachedThreadPool是一个会根据需要创建新线程的线程池 ,他定义如下: CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,这就意味着所有的任务一提交就会加入到阻塞队列中。keepAliveTime这是为60L,unit设置为TimeUnit.SECONDS,意味着空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。阻塞队列采用的SynchronousQueue,而我们在【死磕Java并发】—-J.U.C之阻塞队列:SynchronousQueue中了解到SynchronousQueue是一个没有元素的阻塞队列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,这样就会存在一个问题,如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。 任务提交 线程池根据业务不同的需求提供了两种方式提交任务:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以获取该任务执行的Future。 我们以Executor.execute()为例,来看看线程池的任务提交经历了那些过程。 定义: public interface Executor { void execute(Runnable command);} ThreadPoolExecutor提供实现: 执行流程如下:
在步骤2中如果加入阻塞队列成功了,则会进行一个Double Check的过程。Double Check过程的主要目的是判断加入到阻塞队里中的线程是否可以被执行。如果线程池不是RUNNING状态,则调用remove()方法从阻塞队列中删除该任务,然后调用reject()方法处理任务。否则需要确保还有线程执行。 addWorker 当线程中的当前线程数小于corePoolSize,则调用addWorker()创建新线程执行任务,当前线程数则是根据ctl变量来获取的,调用workerCountOf(ctl)获取低29位即可: private static int workerCountOf(int c) { return c & CAPACITY; } addWorker(Runnable firstTask, boolean core)方法用于创建线程执行任务,源码如下:
在这里需要好好理论addWorker中的参数,在execute()方法中,有三处调用了该方法:
在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类 Woker内部类 Woker的源码如下: 从Worker的源码中我们可以看到Woker继承AQS,实现Runnable接口,所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS主要是为了方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。 当线程thread启动(调用start()方法)时,其实就是执行Worker的run()方法,内部调用runWorker()。 runWorker 运行流程
getTask() timed == true,调用poll()方法,如果在keepAliveTime时间内还没有获取task的话,则返回null,继续循环。timed == false,则调用take()方法,该方法为一个阻塞方法,没有任务时会一直阻塞挂起,直到有任务加入时对该线程唤醒,返回任务。 在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。 processWorkerExit() 首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly == true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。 addWorkerFailed() 在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法: 整个逻辑显得比较简单。 tryTerminate() 当线程池涉及到要移除worker时候都会调用tryTerminate(),该方法主要用于判断线程池中的线程是否已经全部移除了,如果是的话则关闭线程池。 在关闭线程池的过程中,如果线程池处于STOP状态或者处于SHUDOWN状态且阻塞队列为null,则线程池会调用interruptIdleWorkers()方法中断所有线程,注意ONLY_ONE== true,表示仅中断一个线程。 interruptIdleWorkers onlyOne==true仅终止一个线程,否则终止所有线程。 线程终止 线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。 shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。 shutdownNow() :尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。 shutdown shutdownNow 与shutdown不同,shutdownNow会调用interruptWorkers()方法中断所有线程。 |
|
来自: Bladexu的文库 > 《技术文摘》