分享

ThreadPoolExecutor的线程调度及其中的问题

 景昕的花园 2023-10-10 发布于北京

问题现象

在我们的系统中,使用了这样的配置来开启异步操作:
<task:annotation-driven executor="executor"    scheduler="scheduler" /><task:executor id="executor" pool-size="16-128"    keep-alive="60" rejection-policy="CALLER_RUNS"     queue-capacity="1000" />

客户端开启异步代码如下:
@Async()public Future<Result4Calculate> calculateByLendId(int lendrequestId) {    // 标记1    // 调用REST服务;监控调用时间。  }  // 获取Future后的处理如下try {     keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);     // 后续处理catch (Exception e) {     // 标记2     // 异常报警 
然而在这种配置下,客户端在标记1处监控到的调用时间普遍在4s以内(平均时间不到1s,个别峰值请求会突破5s,全天超过5s的请求不到10个。然而,在标记2处捕获到的超时异常却非常多,一天接近700+
问题出在哪儿?

原因分析

上述问题相关代码的调用时序如下图所示。

其中,rest client 与rest server间的交互时间可以明确监控到,用时超过5s的非常少。但是,get方法却经常抛出超时异常。经过初步分析,问题出现在ThreadPoolTaskExecutor的任务调度过程中。

任务调度逻辑

使用<task:executor>注解得到的bean是ThreadPoolTaskExecutor的实例。这个类本身并不做调度,而是将调度工作委托给了ThreadPoolExecutor。后者的任务调度代码如下:
/** * Executes the given task sometime in the future.  The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of *         {@code RejectedExecutionHandler}, if the task *         cannot be accepted for execution * @throws NullPointerException if {@code command} is null */public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}
通过其中的注释,我们可以知道它的核心调度逻辑如下(省略了一些检查等方法):
  1. 如果正在运行的线程数量小于corePoolSize(最小线程数),则尝试启动一个新线程,并将当入参command作为该线程的第一个task。否则进入步骤二。

  2. 如果没有按步骤1执行,那么尝试把入参command放入workQueue中。如果能成功入队,做后续处理;否则,进入步骤三。

  3. 如果没有按步骤2执行,那么将尝试创建一个新线程,然后做后续处理。


简单的说,当向ThreadPoolExecutor提交一个任务时,它会优先交给线程池中的现有线程;如果暂时没有可用的线程,那么它会将任务放到队列中;一般只有在队列满了的时候(导致无法成功入队),才会创建新线程来处理队列中的任务。

顺带一说,任务入队后,在某些条件下也会创建新线程。但新线程不会立即执行当前任务,而是从队列中获取一个任务并开始执行。
参见:http://www./cn/articles/java-threadPool

汇总分析

综上所述,我们可以确定以下信息:
  • 据THREAD的配置,ThreadPoolExecutor中的corePoolSize = 16。

  • 在异步调度过程中,线程池数量没有增长(最多是16个)。

    这一点是通过日志中的线程名称确认的。日志中,异步线程的id从executor-1、executor-2一直到executor-16,但17及以上的都没有出现过。

  • 当并发数超过16时,ThreadPoolExecutor会按照步骤二进行任务调度,即把任务放入队列中,但没有及时创建新线程来执行这个任务

    这一点是推测。在后面的测试中会验证这一点。

  • 队列中的任务出现积压、时间累积,导致某一个任务超时后,后续大量任务都超时。但是超时并没有阻止任务执行;任务仍然会继续通过rest client调用rest server,并被监控代码记录下时间。

任务在队列中积压、累积,是引发一天数百次异常、报警的原因。而监控代码并未监控到任务调度的时间,因此都没有出现超时。

模拟重现

模拟当线程池中工作线程数达到CorePoolSize、且任务数未达到queue-capacity时的情形。
线程池配置如下,其中corePoolSize配置为2,queue-capacity配置为1000.
<!-- 调用rest接口时,使用此异步执行器。避免占用全局的线程池 --><task:executor id="keplerRestExecutor" pool-size="2-128"    keep-alive="60" rejection-policy="CALLER_RUNS"     queue-capacity="1000" />
测试代码如下:
@Testpublic void test_multi_thread() {    System.out.println("start");    for (int i = 0; i < 10; i++) {        new Thread(            () -> {                long start = System.currentTimeMillis();                System.out.println("committed.");                Future<Result4Calculate> result =                   BizOverdueCalculateServiceTest.                  this.bizOverdueCalculateService.                  calculateByLendId(1231);                System.out.println("to get. cost:"                    + (System.currentTimeMillis() - start));                start = System.currentTimeMillis();                try {                    result.get(5, TimeUnit.SECONDS);                } catch (InterruptedException | ExecutionException                        | TimeoutException e) {                    e.printStackTrace();                }                System.out.println("getted. cost:"                    + (System.currentTimeMillis() - start));            }, "thread_" + i).start();    }    System.out.println("all started");    try {        Thread.sleep(10001);    } catch (InterruptedException e) {        e.printStackTrace();    }}
其中bizOverdueCalculateService.calculateByLendId(1231)中的代码如下:
/** * 按进件id试算。 * <p> * 结清日期默认为系统当前日期 * * @param lendrequestId * @return */@Async("keplerRestExecutor")public Future<Result4Calculate> calculateByLendId(int lendrequestId) {    long start = System.currentTimeMillis();    Future<Result4Calculate> f =       this.calculateByLendId(lendrequestId,        new Date());    System.out.println(Thread.currentThread().getName() + ", active count:"        + this.keplerRestExecutor.getActiveCount() + ",  queue size :"        + this.keplerRestExecutor.getThreadPoolExecutor().getQueue().size()        + " rest cost: " + (System.currentTimeMillis() - start));    return f;}
根据上述分析,预计:
  • 测试类并发的发起10个rest调用任务后,只有两个任务会被线程池中的工作线程立即执行,其它八个任务都进入队列。

  • 线程池中始终只有两个工作线程。

  • 队列中每个任务的执行时间都不超时,但执行过若干个任务后,后续任务全部超时。

实际输出如下:
  • 全部提交后,只有两个线程在执行,其它8个任务全部在队列中:active count:2, queue size :8。

  • 线程池中始终只有keplerRestExecutor-1、keplerRestExecutor-2两个线程。active count也始终为2。

  • 任务的实际执行时间(rest cost)都在1s上下。但从第9(每次测试,这个数字会略有不同)个任务开始,result.get(5, TimeUnit.SECONDS)方法出现超时。

测试输出如下:
startcommitted.committed.committed.committed.committed.all startedcommitted.committed.committed.committed.committed.to get. cost:37to get. cost:33to get. cost:33to get. cost:37to get. cost:37to get. cost:33to get. cost:37to get. cost:33to get. cost:37to get. cost:35keplerRestExecutor-1, active count:2, queue size :8 rest cost: 1437getted. cost:1444keplerRestExecutor-2, active count:2, queue size :7 rest cost: 1437getted. cost:1444keplerRestExecutor-1, active count:2, queue size :6 rest cost: 1155getted. cost:2599keplerRestExecutor-2, active count:2, queue size :5 rest cost: 1155getted. cost:2600keplerRestExecutor-1, active count:2, queue size :4 rest cost: 1140getted. cost:3739keplerRestExecutor-2, active count:2, queue size :3 rest cost: 1140getted. cost:3740keplerRestExecutor-1, active count:2, queue size :2 rest cost: 1176getted. cost:4915keplerRestExecutor-2, active count:2, queue size :1 rest cost: 1176getted. cost:4916java.util.concurrent.TimeoutExceptiongetted. cost:5001getted. cost:5001at java.util.concurrent.FutureTask.get(FutureTask.java:205)at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)at java.lang.Thread.run(Thread.java:745)java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)at java.lang.Thread.run(Thread.java:745)keplerRestExecutor-1, active count:2, queue size :0 rest cost: 1175keplerRestExecutor-2, active count:1, queue size :0 rest cost: 1175

rejection-policy的几种配置

rejection-policy是指当线程池中的工作线程和任务队列都达到饱和状态时、或者线程池已经关闭的情况下,线程池处理新的任务的规则。

配置值

基本含义

备注

ABORT

直接中断任务的提交动作

在主线程中抛出任务被拒绝异常。

CALLER_RUNS

由主线程来执行新的任务

异步操作会转换为同步操作

DISCARD

直接丢弃当前任务

在对当前任务进行get时,会出现超时异常。

DISCARD_OLDEST

丢弃任务队列中第一个任务

在关闭队列的情况下,会陷入“尝试丢弃队首任务-尝试入队-尝试丢弃-尝试入队” 的死循环中。

解决方案

针对线程数和队列大小,考虑方案有三:
  • 提高初始线程数。

提高步并发的初始线程数(如将16-128调整为32-128)。以此减少新任务进入队列的几率。
但是这个方案只是降低队列积压的风险,并不解决问题。

  • 关闭队列。

将队列大小调整为0,以此保证每一个新任务都有一个新线程来执行。
这个方案的问题在于,并发压力大时,可能导致线程不够用。此时的异步调用会根据rejection-policy="CALLER_RUNS"的配置而变为同步调用。

  • 线程池。

使用优先创建新线程(而非优先入队列)的线程池。
改动最大的方案。

目前考虑:底层系统并发压力并不大;根据ELK的统计,最高并发大约也就30+rps。可以考虑在指定专用ThreadPoolTaskExecutor的前提下,关闭队列。

此外,rejection-policy的配置,考虑方案有二:
  • 设定为CALLER_RUNS。

这种方式可以保证任务得到执行;但有可能会阻塞主线程。并且阻塞时间视REST调用时间而定。

  • 设定为DISCARD。

这种方式实际上也会阻塞主线程,但是最长会阻塞5s。

目前考虑:试算服务试运行期间设定为DISCARD,以免主线程阻塞时间过长。逾期试算服务完成性能优化、并且服务稳定之后,设定为CALLER_RUNS,以确保试算任务得到执行。

关闭队列方案测试代码

复用模拟重现中的测试代码,修改线程池配置如下
<!-- 调用kepler的rest接口时,使用此异步执行器。避免占用全局的线程池 --><task:executor id="keplerRestExecutor" pool-size="2-128"    keep-alive="60" rejection-policy="CALLER_RUNS"    queue-capacity="0" /><task:scheduler id="scheduler" pool-size="32" />
再次执行上测试代码。预计:
  • 任务一经提交,就会创建10个工作线程来分别执行。

  • 队列大小始终为0.

  • 不会出现超时。

  • 可能会出现后续任务中,active count 小于10的情况。



关闭队列后的日志输出

startcommitted.committed.committed.committed.committed.all startedcommitted.committed.committed.committed.committed.to get. cost:3to get. cost:7to get. cost:7to get. cost:3to get. cost:7to get. cost:7to get. cost:7to get. cost:7to get. cost:4to get. cost:4keplerRestExecutor-7, active count:10, queue size :0 rest cost: 2177getted. cost:2182keplerRestExecutor-4, active count:9, queue size :0 rest cost: 2182getted. cost:2187keplerRestExecutor-9, active count:8, queue size :0 rest cost: 2185getted. cost:2190keplerRestExecutor-1, active count:7, queue size :0 rest cost: 2190getted. cost:2196keplerRestExecutor-3, active count:6, queue size :0 rest cost: 2191getted. cost:2196keplerRestExecutor-2, active count:5, queue size :0 rest cost: 2191keplerRestExecutor-5, active count:4, queue size :0 rest cost: 2191getted. cost:2196getted. cost:2196keplerRestExecutor-10, active count:3, queue size :0 rest cost: 2192getted. cost:2197keplerRestExecutor-8, active count:2, queue size :0 rest cost: 2192getted. cost:2197keplerRestExecutor-6, active count:1, queue size :0 rest cost: 2193getted. cost:2198

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多