分享

「DUBBO系列」线程池策略详解

 鹰兔牛熊眼 2020-06-17

IT徐胖子原创本文未授权请勿转载

1 文章概述

本系列文章已经分析了DUBBO线程模型实现原理,我们知道不同线程模型选择使用IO线程还是业务线程。

业务线程池采用什么线程池策略,例如处理任务方式,线程空闲多长时间被回收,采用什么拒绝策略需要在本文得到回答。

DUBBO提供多种线程池策略,选择线程池策略需要在配置文件指定threadpool属性

<dubbo:protocol name='dubbo' threadpool='fixed' threads='100' /><dubbo:protocol name='dubbo' threadpool='cached' threads='100' /><dubbo:protocol name='dubbo' threadpool='limited' threads='100' /><dubbo:protocol name='dubbo' threadpool='eager' threads='100' />

不同线程池策略创建不同线程池,不同线程池处理任务方式也不相同

fixed固定线程数量
cached线程空闲一分钟会被回收,当新请求到来时会创建新线程
limited线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收
eager当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务

2 线程池策略确认时机

public class AllDispatcher implements Dispatcher { public static final String NAME = 'all';
@Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); }}
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); }}

分析WrappedChannelHandler构造函数

public class WrappedChannelHandler implements ChannelHandlerDelegate {    public WrappedChannelHandler(ChannelHandler handler, URL url) {        this.handler = handler;        this.url = url;        // 获取线程池自适应扩展点默认FixedThreadPool        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {            componentKey = Constants.CONSUMER_SIDE;        }        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);    }}

如果配置threadpool属性扩展点加载器会从URL获取属性值加载对应线程池策略

@SPI('fixed')public interface ThreadPool { @Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url);}

3 源码分析

3.1 FixedThreadPool

public class FixedThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 线程个数默认200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.2 CachedThreadPool

public class CachedThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 获取线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.3 LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 获取线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数默认200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue // keepalive时间设置Long.MAX_VALUE表示不回收空闲线程 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

3.4 EagerThreadPool

通过自定义线程执行器可以自定义线程处理策略,我们进行一个对比。

ThreadPoolExecutor是普通线程执行器。当线程池核心线程达到阈值时新任务被放入队列。当队列已满开启新线程处理。当前线程数达到最大线程数时执行拒绝策略。

EagerThreadPoolExecutor是自定义线程执行器。当线程池核心线程达到阈值时,新任务不会放入队列而是开启新线程处理,要求当前线程数没有超过最大线程数。当前线程数达到最大线程数时任务放入队列。队列已满执行拒绝策略。

public class EagerThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 初始化自定义线程池和队列 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; }}
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
// 提交任务个数 private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
@Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 任务数自增 submittedTaskCount.incrementAndGet(); try { // 调用父类方法执行线程任务 super.execute(command); } // 抛出拒绝异常 catch (RejectedExecutionException rx) { final TaskQueue queue = (TaskQueue) super.getQueue(); try { // 任务重新放入队列 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { // 任务重新放入队列失败抛出异常 submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException('Queue capacity is full.', rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { submittedTaskCount.decrementAndGet(); throw t; } }}
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
@Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException('The task queue does not have executor'); }
// 当前线程数 int currentPoolThreadSize = executor.getPoolSize();
// 任务数 < 当前线程数表示存在空闲worker线程则任务放入队列等待worker线程处理 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); }
// 当前线程数 < 最大线程数返回false表示创建worker线程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 当前线程数 > 最大线程数任务放入队列 return super.offer(runnable); }
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException('Executor is shutdown!'); } // 任务重试放入队列 return super.offer(o, timeout, unit); }}

4 文章总结

本文分析了DUBBO线程池策略源码与实现原理,可以根据不同业务场景选择不同线程池策略。后续文章我们深入分析「DUBBO线程池打满问题」请继续关注。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多