分享

多线程技术原理

 昵称11935121 2018-08-19

如:newFixedThreadPool

[java] view plain copy

  1. public

    static

    ExecutorService newFixedThreadPool(

    int

    nThreads) {
  2. return

    new

    ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new

    LinkedBlockingQueue());
  5. }

如:newCachedThreadPool[java] view plain copy

  1. public

    static

    ExecutorService newCachedThreadPool() {
  2. return

    new

    ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new

    SynchronousQueue());
  5. }

如:newSingleThreadExecutor[java] view plain copy

  1. public

    static

    ExecutorService newSingleThreadExecutor() {
  2. return

    new

    FinalizableDelegatedExecutorService
  3. (

    new

    ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new

    LinkedBlockingQueue()));
  6. }

可以发现,其实它们调用的都是同一个接口ThreadPoolExecutor方法,只不过传入参数不一样而已。下面就来看看这个神秘的ThreadPoolExecutor。

首先来看看它的一些基本参数:

[java] view plain copy

  1. public

    class

    ThreadPoolExecutor

    extends

    AbstractExecutorService {
  2. //运行状态标志位
  3. volatile

    int

    runState;
  4. static

    final

    int

    RUNNING = 0;
  5. static

    final

    int

    SHUTDOWN = 1;
  6. static

    final

    int

    STOP = 2;
  7. static

    final

    int

    TERMINATED = 3;
  8. //线程缓冲队列,当线程池线程运行超过一定线程时并满足一定的条件,待运行的线程会放入到这个队列
  9. private

    final

    BlockingQueue workQueue;
  10. //重入锁,更新核心线程池大小、最大线程池大小时要加锁
  11. private

    final

    ReentrantLock mainLock =

    new

    ReentrantLock();
  12. //重入锁状态
  13. private

    final

    Condition termination = mainLock.newCondition();
  14. //工作都set集合
  15. private

    final

    HashSet workers =

    new

    HashSet();
  16. //线程执行完成后在线程池中的缓存时间
  17. private

    volatile

    long

    keepAliveTime;
  18. //核心线程池大小
  19. private

    volatile

    int

    corePoolSize;
  20. //最大线程池大小
  21. private

    volatile

    int

    maximumPoolSize;
  22. //当前线程池在运行线程大小
  23. private

    volatile

    int

    poolSize;
  24. //当缓冲队列也放不下线程时的拒绝策略
  25. private

    volatile

    RejectedExecutionHandler handler;
  26. //线程工厂,用来创建线程
  27. private

    volatile

    ThreadFactory threadFactory;
  28. //用来记录线程池中曾经出现过的最大线程数
  29. private

    int

    largestPoolSize;
  30. //用来记录已经执行完毕的任务个数
  31. private

    long

    completedTaskCount;
  32. ................
  33. }

初始化线程池大小 有以下四种方法:

从源码中可以看到其实最终都是调用了以下的方法:

[java] view plain copy

  1. public

    ThreadPoolExecutor(

    int

    corePoolSize,
  2. int

    maximumPoolSize,
  3. long

    keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if

    (corePoolSize < 0="">
  9. maximumPoolSize <= 0="">
  10. maximumPoolSize < corepoolsize="">
  11. keepAliveTime <>
  12. throw

    new

    IllegalArgumentException();
  13. if

    (workQueue ==

    null

    || threadFactory ==

    null

    || handler ==

    null

    )
  14. throw

    new

    NullPointerException();
  15. this

    .corePoolSize = corePoolSize;
  16. this

    .maximumPoolSize = maximumPoolSize;
  17. this

    .workQueue = workQueue;
  18. this

    .keepAliveTime = unit.toNanos(keepAliveTime);
  19. this

    .threadFactory = threadFactory;
  20. this

    .handler = handler;
  21. }

这里很简单,就是设置一下各个参数,并校验参数是否正确,然后抛出对应的异常。接下来我们来看看最重要的方法execute,其源码如下:

[java] view plain copy

  1. public

    void

    execute(Runnable command) {
  2. if

    (command ==

    null

    )
  3. throw

    new

    NullPointerException();
  4. if

    (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 判断1
  5. if

    (runState == RUNNING && workQueue.offer(command)) { // 判断2
  6. if

    (runState != RUNNING || poolSize == 0) // 判断3
  7. ensureQueuedTaskHandled(command);
  8. }
  9. else

    if

    (!addIfUnderMaximumPoolSize(command)) // 判断4
  10. reject(command); // is shutdown or saturated
  11. }
  12. }

笔者在上面加了点注释。下面我们一个一个判断来看首先判断1

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

(1)当poolSize >= corePoolSize 不成立时,表明当前线程数小于核心线程数目,左边返回fasle.接着执行右边判断!addIfUnderCorePoolSize(command)

它做了如下操作

[java] view plain copy

  1. private

    boolean

    addIfUnderCorePoolSize(Runnable firstTask) {
  2. Thread t =

    null

    ;
  3. final

    ReentrantLock mainLock =

    this

    .mainLock;//加锁
  4. mainLock.lock();
  5. try

    {
  6. if

    (poolSize < corepoolsize="" &&="" runstate="=">
  7. t = addThread(firstTask);//加入线程
  8. }

    finally

    {
  9. mainLock.unlock();
  10. }
  11. return

    t !=

    null

    ;
  12. }

发现它又调用addTread[java] view plain copy

  1. private

    Thread addThread(Runnable firstTask) { //调用这个方法之前加锁
  2. Worker w =

    new

    Worker(firstTask);//线程包装成一个work
  3. Thread t = threadFactory.newThread(w);//线程工厂从work创建线程
  4. boolean

    workerStarted =

    false

    ;
  5. if

    (t !=

    null

    ) {
  6. if

    (t.isAlive()) // 线程应该是未激活状态
  7. throw

    new

    IllegalThreadStateException();
  8. w.thread = t;
  9. workers.add(w);//全局set添加一个work
  10. int

    nt = ++poolSize;//当前运行线程数目加1
  11. if

    (nt > largestPoolSize)
  12. largestPoolSize = nt;
  13. try

    {
  14. t.start();//注意,这里线程执行了,但是其实真正调用的是Worker类的run方法!!!!!!!!!
  15. workerStarted =

    true

    ;
  16. }
  17. finally

    {
  18. if

    (!workerStarted)
  19. workers.remove(w);
  20. }
  21. }
  22. return

    t;
  23. }

其实Work是真实去调用线程方法的地方,它是对Thread类的一个包装,每次Thread类调用其start方法时,就会调用到work的run方法。其代码如下,[java] view plain copy

  1. private

    void

    runTask(Runnable task) { //真正发起线程方法的地方
  2. final

    ReentrantLock runLock =

    this

    .runLock;
  3. runLock.lock();
  4. try

    {

[java] view plain copy

  1. if

    ((runState >= STOP || //判断的判断
  2. (Thread.interrupted() && runState >= STOP)) &&
  3. hasRun)
  4. thread.interrupt();
  5. boolean

    ran =

    false

    ;
  6. beforeExecute(thread, task);//处理前
  7. try

    {
  8. task.run();//执行真正的原始线程的run方法
  9. ran =

    true

    ;
  10. afterExecute(task,

    null

    );//处理后
  11. ++completedTasks;
  12. }

    catch

    (RuntimeException ex) {
  13. if

    (!ran)
  14. afterExecute(task, ex);
  15. throw

    ex;
  16. }
  17. }

    finally

    {
  18. runLock.unlock();
  19. }
  20. }
  21. //这里执行线程的方法
  22. public

    void

    run() {
  23. try

    {
  24. hasRun =

    true

    ;
  25. Runnable task = firstTask;
  26. firstTask =

    null

    ;
  27. while

    (task !=

    null

    || (task = getTask()) !=

    null

    ) {
  28. runTask(task);
  29. task =

    null

    ;
  30. }
  31. }

    finally

    {
  32. workerDone(

    this

    );
  33. }
  34. }

发现要执行一个线程真的很不容易,如果addIfUnderCorePoolSize返回true,刚表明成功添加一条线程,并调用了其start方法,那么整个调用到此结束。如果返回fasle.那么就进入判断2.(2)当poolSize >= corePoolSize成立时,整个判断返回true。接着执行判断2

判断2

[java] view plain copy

  1. if

    (runState == RUNNING && workQueue.offer(command)) { // 判断2

如果当前线程池在运行状态,并且将当前线程加入到缓冲队列中。workQueue的offer是一个非阻塞方法。如查缓冲队列满了的话,返回为false.否则返回true;如果上面两个都 为true,表明线程被成功添加到缓冲队列中,并且当前线程池在运行。进入判断3

判断3

[java] view plain copy

  1. if

    (runState != RUNNING || poolSize == 0)
  2. ensureQueuedTaskHandled(command);

当线程被加入到线程池中,进入判断3.如果这时线程池没有在运行或者运行的线程为为0。那么就调用ensureQueuedTaskHandled,它做的其实是判断下是否在拒绝这个线程的执行。[java] view plain copy

  1. private

    void

    ensureQueuedTaskHandled(Runnable command) {
  2. final

    ReentrantLock mainLock =

    this

    .mainLock;
  3. mainLock.lock();
  4. boolean

    reject =

    false

    ;
  5. Thread t =

    null

    ;
  6. try

    {
  7. int

    state = runState;
  8. if

    (state != RUNNING && workQueue.remove(command)) //线程池没有在运行,且缓冲队列中有这个线程
  9. reject =

    true

    ;
  10. else

    if

    (state < stop="">
  11. poolSize < math.max(corepoolsize,="" 1)="">
  12. !workQueue.isEmpty())
  13. t = addThread(

    null

    );
  14. }

    finally

    {
  15. mainLock.unlock();
  16. }
  17. if

    (reject)
  18. reject(command); //根据拒绝策略处理线程
  19. }

判断4[java] view plain copy

  1. else

    if

    (!addIfUnderMaximumPoolSize(command))
  2. reject(command); // is shutdown or saturated

在判断2为false时执行,表明当前线程池没有在运行或者该线程加入缓冲队列中失败,那么就会尝试再启动下该线程,如果还是失败,那就根据拒绝策略来处理这个线程。其源码如下:[java] view plain copy

  1. private

    boolean

    addIfUnderMaximumPoolSize(Runnable firstTask) {
  2. Thread t =

    null

    ;
  3. final

    ReentrantLock mainLock =

    this

    .mainLock;
  4. mainLock.lock();
  5. try

    {
  6. if

    (poolSize < maximumpoolsize="" &&="" runstate="=" running)="" 如果当前运行线程数目="" 小于最大线程池大小="" 并且="">
  7. t = addThread(firstTask);
  8. }

    finally

    {
  9. mainLock.unlock();
  10. }
  11. return

    t !=

    null

    ;
  12. }

一般调用这个方法是发生在缓冲队列已满了,那么线程池会尝试直接启动该线程。当然,它要保存当前运行的poolSize一定要小于maximumPoolSize。否则,最后。还是会拒绝这个线程!以上大概就是整个线程池启动一条线程的整体过程。

总结:

ThreadPoolExecutor中,包含了一个任务缓存队列和若干个执行线程,任务缓存队列是一个大小固定的缓冲区队列,用来缓存待执行的任务,执行线程用来处理待执行的任务。每个待执行的任务,都必须实现Runnable接口,执行线程调用其run()方法,完成相应任务。

ThreadPoolExecutor对象初始化时,不创建任何执行线程,当有新任务进来时,才会创建执行线程。

构造ThreadPoolExecutor对象时,需要配置该对象的核心线程池大小和最大线程池大小:

当目前执行线程的总数小于核心线程大小时,所有新加入的任务,都在新线程中处理

当目前执行线程的总数大于或等于核心线程时,所有新加入的任务,都放入任务缓存队列中

当目前执行线程的总数大于或等于核心线程,并且缓存队列已满,同时此时线程总数小于线程池的最大大小,那么创建新线程,加入线程池中,协助处理新的任务。

当所有线程都在执行,线程池大小已经达到上限,并且缓存队列已满时,就rejectHandler拒绝新的任务

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多