回复“面试”获取最新资料 回复“加群”邀您进技术交流群 何为线程池?顾名思义,线程池就是存放一定量线程的容器,当有待执行任务的时候直接从线程池中取出线程执行任务,任务执行完成之后将线程回放至线程池中。线程池的优点:降低了线程频繁创建、销毁的开销,提高系统的响应速度,方便统一管理创建的线程。 java.util.concurrent.ThreadPoolExecutor 线程池(ThreadPoolExecutor)提供 4 个默认的构造方法,固定参数 5 个。如下图: 
核心参数解释如下:
核心线程数量:corePoolSize 最大线程数量:maximumPoolSize 非核心线程空闲等待时间:keepAliveTime 等待时间单位:timeUnit 等待阻塞队列:blockingQueue<Runnable> (可选)线程工厂创建线程:threadFactory (可选)线程池拒绝策略:rejectedExecutionHandler
线程池原理
corePoolSize
线程池中默认存活的线程数量。不同的线程池对于核心线程数量有不同的要求,也与 allowCoreThreadTimeout 参数有关。当 allowCoreThreadTimeout= true 时,核心线程没有任务且存活时间超过空闲等待时间后终止。 maximumPoolSize当 currentThreadNumber >= corePoolSize ,且任务队列已满时,线程池会创建新线程来处理任务;当 currentThreadNumber =maxPoolSize ,且任务队列已满时,线程池会拒绝处理任务而抛出异常。 keepAliveTime线程池中空闲线程允许存活的时间,超过配置时间将会被终止。blockingQueue:线程池缓存队列存放待处理的线程任务。 ArrayBlockingQueue:指定大小的等待队列(FIFO);有界队列,创建时需要指定队列的大小。 LinkedBlockingQueue:基于链表的等待队列(FIFO);无界队列,创建时不指定队列大小,默认为 Integer.MAX_VALUE。 PriorityBlockingQueue:带有优先级的等待队列 SynchronizedQueue:不存放对象的等待队列;同步移交队列,直接新建一个线程来执行新来的任务。
threadFactoryrejectedExecutionHandler当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize 时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:AbortPolicy(默认策略):丢弃任务并抛出 RejectedExecutionException。 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。 DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务(舍弃最老的请求,即将队列头部任务舍弃)。 DiscardPolicy:不做任何处理,直接丢弃任务。
怎么创建线程池?线程池的创建主要有 2 种方式 Executors 执行器创建线程池是在 ThreadPoolExecutor 构造方法上进行简单的封装,特殊场景根据需要自行创建。可以把Executors理解成一个工厂类 。阿里开发规范中是建议使用ThreadPoolExecutor 来创建线程池。 // ThreadPoolExecutor 最基础的构造方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 核心线程、最大线程数量必须大于 0,且 最大线程数量 大于等于 核心线程数量,空闲等待时间大于 0 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
常见线程池有以下 4 种: newFixedThreadPool:固定大小的线程池 newSingleThreadExecutor:单个线程线程池 newCachedThreadPool:缓存线程池 newScheduledThreadPool:调度线程池
newFixedThreadPool固定大小的线程池线程数量不存在变化,待处理的任务过多时会存放到缓存队列中。线程池会维护一定数量的线程,当创建的线程过多时阻塞队列中等待执行的线程数量会大量堆积,系统资源不足时容易发生 OOM。// 创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(5); public static ExecutorService newFixedThreadPool(int nThreads) { // 固定大小是指: 核心线程数量 = 最大线程数量 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
直接使用线程池构造方法创建固定线程池,线程池中有且仅有固定数量的线程去执行待执行的任务。当“待执行的任务数量 > maximumPoolSize + blockingQueue.size()”时,会抛出异常 java.util.concurrent.RejectedExecutionException。 // 阻塞队列大小为 5 LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(5); // 创建线程数量为 3 的固定大小线程池 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, blockingQueue); // 待提交执行任务数量为 10,允许执行任务的数量为 8 for (int i = 0; i< 10; i++) { poolExecutor.submit( () -> { System.out.println(Thread.currentThread().getName()); }); }
newSingleThreadExecutor单线程线程池有且仅有一个线程,若有多余的任务提交到线程池中则会被暂存到阻塞队列,待线程空闲时再去执行,当线程在执行过程中失败而终止时,会创建个新的线程去执行缓存队列中的任务。ExecutorService threadPool = Executors.newSingleThreadExecutor(); public static ExecutorService newSingleThreadExecutor() { // 核心线程数、最大线程数都为 1,阻塞队列大小为 Integer.MAX_VALUE return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool缓存线程池初始时不存在线程,根据需要创建线程,当线程池中不存在空闲可用线程时会创建新的线程,线程池中超过 60s 且未使用的线程将被终止并删除。因此,合理的空闲等待时间,线程池可以维护一定数量的线程有利于提高性能。ExecutorService threadPool = Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { // 线程池核心线程数为 0,最大为 Integer.MAX_VALUE,空闲等待时间未 60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newScheduledThreadPool调度线程池,可以以固定的频率执行任务或者固定的延时执行任务。ExecutorService threadPool = Executors.newScheduledThreadPool(3); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { // 设置核心线程数量,默认最大线程数量为 Integer.MAX_VALUE return new ScheduledThreadPoolExecutor(corePoolSize); }
public ScheduledThreadPoolExecutor(int corePoolSize) { // ThreadPoolExecutor 父类构造方法 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
scheduleAtFixedRate:固定频率执行任务,即任务执行的频率保持不变。 // command:需要执行的任务;initialDelay:初始执行延迟时间;period:后续任务执行延迟时间;unit:延迟时间单位 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务执行延迟 1s【周期性的操作,period = 0,所有的任务都将在初始延迟后执行,没有周期性可言】 ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); threadPool.scheduleAtFixedRate( ()-> { try { // 假设任务执行时间为 3s,当 workTime <= period 时,延迟 period 时间后执行;当 workTime > period ,延迟 workTime 时间后执行 Thread.sleep(3000); System.out.println("scheduleAtFixedRate " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) ); } catch (InterruptedException e) { e.printStackTrace(); } }, 3, 1, TimeUnit.SECONDS);
scheduleWithFixedDelay:固定的延时执行任务,指上一次执行成功之后和下一次开始执行的之前的时间。 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务将在上一个任务执行成功 1 是后执行; ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); threadPool.scheduleWithFixedDelay( ()-> { try { Thread.sleep(1000); System.out.println("scheduleWithFixedDelay " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) ); } catch (InterruptedException e) { e.printStackTrace(); } }, 3, 1, TimeUnit.SECONDS);
线程池有哪些状态通过获取线程池状态,可以判断线程池是否是运行状态、可否添加新的任务以及优雅地关闭线程池等。
RUNNING: 线程池的初始化状态,可以添加待执行的任务。 SHUTDOWN:线程池处于待关闭状态,不接收新任务仅处理已经接收的任务。 STOP:线程池立即关闭,不接收新的任务,放弃缓存队列中的任务并且中断正在处理的任务。 TIDYING:线程池自主整理状态,调用 terminated() 方法进行线程池整理。 TERMINATED:线程池终止状态。
判断线程池状态常用方法总结如下。 shutdown 和 shutdownNow 的联系和区别: isShutdown、isTerminated、isTerminating 的区别: isShutDown():调用 shutdown() 或 shutdownNow() 方法后返回 true。 isTerminated():线程池中不存在任何需要执行的任务时,返回 true。 isTerminating() 线程池调用 shutdown() 或 shutdownNow() 后但是没有完全终止返回 true。
常用方法线程池任务执行 execute()和 submit() 方法。 Executor 框架成员及关系图如下: 
线程池监控使用线程池可以提高系统并发时的吞吐量,提高系统性能。但是使用不当会造成系统资源占用过高,线程池缓存队列堆积大量待执行任务、缓存线程池中存在大量的耗时任务等会造成内存溢出、系统访问缓慢等问题。因此监控线程池就显得极为重要,下面根据源码解读常用方法进行线程池的监控。核心线程数量: // 线程池维持线程的最小存活数量与 allowCoreThreadTimeOut 参数有关 public int getCorePoolSize() { return corePoolSize; }
线程池最大的线程数量: public int getMaximumPoolSize() { return maximumPoolSize; }
线程池最多创建的线程数量: public int getLargestPoolSize() { // 获取主线程锁,获取调用此方法时,线程池中曾创建的最大的线程数量 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } }
// 添加待执行任务方法 private boolean addWorker(Runnable firstTask, boolean core){ ... if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加待执行的任务 workers.add(w); // 获取线程池中存在的工作线程的大小 int s = workers.size(); if (s > largestPoolSize) // 赋值 largestPoolSize = s; workerAdded = true; } ... }
线程池当前存在的线程数量: public int getPoolSize() { // 获取主线程锁,获取调用此方法时,线程池中存在线程数量 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Remove rare and surprising possibility of // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } }
// 最近运行状态 private static boolean runStateAtLeast(int c, int s) { return c >= s; }
线程池已完成任务数量: // Worker 类属性获取线程任务计数器 volatile long completedTasks;
public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; // 遍历线程池中的线程 for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } }
线程池存在的任务总量: public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; // w.islocked() 获取当前任务的状态,调用 isHeldExclusively() 判断 if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } }
// Work 内部类 public void lock() { acquire(1); } // 加锁 public boolean tryLock() { return tryAcquire(1); }// 尝试获取锁 public void unlock() { release(1); }// 释放锁
public boolean isLocked() { return isHeldExclusively(); } // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } 好了,以上便是今天对线程池源码相关的分享。
码字不易。老田期待你来个:点赞...转发...
|