IT徐胖子原创本文未授权请勿转载 1 文章概述 本系列文章已经分析了DUBBO线程模型实现原理,我们知道不同线程模型选择使用IO线程还是业务线程。 业务线程池采用什么线程池策略,例如处理任务方式,线程空闲多长时间被回收,采用什么拒绝策略需要在本文得到回答。 DUBBO提供多种线程池策略,选择线程池策略需要在配置文件指定threadpool属性 <dubbo:protocol name='dubbo' threadpool='fixed' threads='100' /> <dubbo:protocol name='dubbo' threadpool='cached' threads='100' /> <dubbo:protocol name='dubbo' threadpool='limited' threads='100' /> <dubbo:protocol name='dubbo' threadpool='eager' threads='100' /> 不同线程池策略创建不同线程池,不同线程池处理任务方式也不相同
2 线程池策略确认时机 public class AllDispatcher implements Dispatcher { public static final String NAME = 'all';
@Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } } 分析WrappedChannelHandler构造函数
如果配置threadpool属性扩展点加载器会从URL获取属性值加载对应线程池策略 @SPI('fixed') public interface ThreadPool { @Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url); } 3 源码分析 3.1 FixedThreadPool
3.2 CachedThreadPool public class CachedThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 获取线程名称 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 队列容量等于0使用阻塞队列SynchronousQueue // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 3.3 LimitedThreadPool
3.4 EagerThreadPool 通过自定义线程执行器可以自定义线程处理策略,我们进行一个对比。 ThreadPoolExecutor是普通线程执行器。当线程池核心线程达到阈值时新任务被放入队列。当队列已满开启新线程处理。当前线程数达到最大线程数时执行拒绝策略。 EagerThreadPoolExecutor是自定义线程执行器。当线程池核心线程达到阈值时,新任务不会放入队列而是开启新线程处理,要求当前线程数没有超过最大线程数。当前线程数达到最大线程数时任务放入队列。队列已满执行拒绝策略。 public class EagerThreadPool implements ThreadPool {
@Override public Executor getExecutor(URL url) {
// 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心线程数默认0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大线程数默认Int最大值 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 队列容量默认0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 线程空闲多少时间被回收默认1分钟 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// 初始化自定义线程池和队列 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
// 提交任务个数 private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
@Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 任务数自增 submittedTaskCount.incrementAndGet(); try { // 调用父类方法执行线程任务 super.execute(command); } // 抛出拒绝异常 catch (RejectedExecutionException rx) { final TaskQueue queue = (TaskQueue) super.getQueue(); try { // 任务重新放入队列 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { // 任务重新放入队列失败抛出异常 submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException('Queue capacity is full.', rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { submittedTaskCount.decrementAndGet(); throw t; } } }
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
@Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException('The task queue does not have executor'); }
// 当前线程数 int currentPoolThreadSize = executor.getPoolSize();
// 任务数 < 当前线程数表示存在空闲worker线程则任务放入队列等待worker线程处理 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); }
// 当前线程数 < 最大线程数返回false表示创建worker线程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 当前线程数 > 最大线程数任务放入队列 return super.offer(runnable); }
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException('Executor is shutdown!'); } // 任务重试放入队列 return super.offer(o, timeout, unit); } } 4 文章总结 本文分析了DUBBO线程池策略源码与实现原理,可以根据不同业务场景选择不同线程池策略。后续文章我们深入分析「DUBBO线程池打满问题」请继续关注。 |
|