分享

java线程池的实现原理

 Y丶z 2018-07-29

java线程池的实现原理

简单使用架构分析线程池的实现原理一些重要的属性线程池的状态线程池状态间的转换workQueueworkerscorePoolSizemaximumPoolSizehandlerkeepAliveTimeallowCoreThreadTimeOutthreadFactorylargestPoolSizecompletedTaskCount构造函数提交任务执行缓存策略和排队策略任务拒绝策略线程池的关闭线程池容量的动态调整参考资料

简单使用

public class ThreadDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<Integer> result = es.submit(new SubThread());
        System.out.println("线程执行结果:"+result.get());

    }
}


class SubThread implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        int result = 0;
        for (int i = 0; i < 100; i++) {
            result+=i;
        }

        return result;
    }
}

架构分析

图片来源于http://www.cnblogs.com/qlky/p/7390274.html
图片来源于http://www.cnblogs.com/qlky/p/7390274.html

线程池的实现原理

线程池就是ThreadPoolExecutor类,下面也是主要介绍它的实现原理。

一些重要的属性

线程池的状态

    private static final int COUNT_BITS = Integer.SIZE - 3;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

Integer.SIZE为32,所以COUNT_BITS为29。以RUNNING = -1 << COUNT_BITS举例来说,-1的补码左移位29为11100000000000000000000000000000,也就是说,线程的状态存储在int的高3位。

下面具体介绍下这几种状态:

  1. RUNNING:
    移位后:11100000000000000000000000000000。
    该状态下线程池能接受新任务,并且对已经添加的任务进行处理。

2、SHUTDOWN:

    移位后:00000000000000000000000000000000。
    该状态下线程池不再接受新任务,但仍然可以执行队列中的任务。
  1. STOP:
    移位后:00100000000000000000000000000000
    该状态下线程池不再接受新任务,并且会中断正在执行的任务。
  1. TIDYING
    移位后:01000000000000000000000000000000
    该状态下所有的任务都已经终止,workerCount的值为0,当线程池处于TIDYING状态时,会执行钩子函数terminated()。

5.TERMINATED

    移位后:01100000000000000000000000000000
    当线程池执行完terminated()状态即TERMINATED

线程池状态间的转换

 * RUNNING -> SHUTDOWN :调用shutdown()方法
 * (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()方法
 * SHUTDOWN -> TIDYING:当队列和线程池都为空
 * STOP -> TIDYING:当线程池为空的时候
 * TIDYING -> TERMINATED:当调用terminated()完成

workQueue

    /*任务缓存队列,用来存放等待执行的任务
     */

    private final BlockingQueue<Runnable> workQueue;

workers

    //工作集
    private final HashSet<Worker> workers = new HashSet<Worker>();

corePoolSize

//核心线程池的大小
private volatile int   corePoolSize; 

具体是指:线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列。核心线程会一直存在,除非设置了allowCoreThreadTimeOut为true。

maximumPoolSize

private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数

handler

private volatile RejectedExecutionHandler handler; //任务拒绝策略

当提交的任务超过线程池的maximumPoolSize,那么就采用handler的方式来处理。

共有以下4种策略:

1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程
2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。
3、DiscardPolicy:直接抛弃任务。
4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。

keepAliveTime

private volatile long  keepAliveTime;    //线程存活时间   

空闲线程的存活时间。如果当前的线程总量没有超过corePoolSize,就算空闲,线程也一直存在,除非设置allowCoreThreadTimeOut为true。若当前的线程总量超过corePoolSize,而且线程空闲,则经过keepAliveTime的时间,会将线程销毁。

allowCoreThreadTimeOut

private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间

当设置为true的时候,空闲的核心线程也超时。

threadFactory

private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程

largestPoolSize

private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数

completedTaskCount

private long completedTaskCount;   //用来记录已经执行完毕的任务个数

构造函数

java共提供4种构造函数:

    //第一种
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    //第二种
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    //第三种
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    //第四种
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler
{
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

可以看到,前三种都是调用第四种来实现的,我们具体来看下第四种。

先看一下各参数代表的含义:

corePoolSize:核心线程数量
maximumPoolSize:线程池中最大线程数量
keepAliveTime:空闲线程存活时间
unit:空闲线程存活时间单位
workQueue:任务等待队列,当运行的线程到达corePoolSize的时候就会把新的任务加到workQueue
handler : 任务拒绝策略,当执行的线程和等待的任务都已经满了,再提交的任务就会用此handler来处理。

剩下的逻辑就是先校验参数的合法性,若合法的话,就将参数赋给成员变量。

提交任务执行

我们创建任务,然后调用线程池的submit方法来执行,那么看一下submit的逻辑。

根据参数的不同,submit提供了三种重载函数,如下:

    public Future<?> submit(Runnable task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */

    public <T> Future<T> submit(Callable<T> task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

可以看到submit的底层调用的还是execute()方法,下面再来看一下execute()

缓存策略和排队策略

任务拒绝策略

线程池的关闭

线程池容量的动态调整

参考资料

Java多线程线程池(4)--线程池的五种状态

Java - "JUC线程池" 架构

小学徒进阶系列—揭开ThreadPoolExecutor神秘的面纱

深入理解java线程池—ThreadPoolExecutor

深入理解Java之线程池

JAVA线程池原理详解一

Java线程池架构原理和源码解析(ThreadPoolExecutor)

Java多线程研究05-ThreadPoolExecutor中workQueue、threadFactory和handle

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多