Executor框架包含的主要的类与接口如下: Executor:是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。 ThreadPoolExecutor:线程池的核心实现类,用来执行被提交的任务。 ScheduledThreadPoolExecutor:可以在给定的延迟后运行命令,或者定期执 行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。 Future和FutureTask:异步计算的结果 Runnable和Callable:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一 个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。 然后可以把Runnable对象直接交给ExecutorService执行ExecutorService.execute(Runnable command);或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task)。 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象FutureTask。由于FutureTask实现了Runnable,程序员也可 以创建FutureTask,然后直接交给ExecutorService执行。 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。 Executor框架的成员Executor框架主要包括以下成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。 ThreadPoolExecutor ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。 FixedThreadPool FixedThreadPool被称为可重用固定线程数的线程池,下面是FixedThreadPool的源码: public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指 定的参数nThreads。 当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。FixedThreadPool的execute()方法的运行示意图如下: 说明如下: 如果当前运行的线程数小于corePoolSize,则创建新线程来执行任务 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue 当线程执行完1中的任务后,会在循环中反复从linkedBlockingQueue中获取任务来执行
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE),使用无界队列作为工作队列会对线程池带来如下影响: 当线程池中的线程数达到corePoolSize之后,新任务将在无界队列中等待,因此线程池中的线程永远不会超过corePoolSize 由于1, 使用无界队列时maximumPoolSize将是一个无效参数 由于1和2, 使用无界队列时keepAliveTime将是一个无效参数 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务,不会调用RejectedExecutionHandler.rejectedExecution方法
SingleThreadExecutor SingleThreadExecutor是使用单个worker线程的Executor: public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工 作队列(队列的容量为Integer.MAX_VALUE);SingleThreadExecutor使用无界队列作为工作队列 对线程池带来的影响与FixedThreadPool相同 说明如下: 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务 在线程池完成预热后(当前线程池中有一个运行的线程),将任务加入到LinkedBlockingQueue 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue中获取任务来执行
CachedThreadPool CachedThreadPool是一个会根据需要创建新线程的线程池: public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CacheThreadPool的corePoolSize的值为0,即corePool为空;maximumPoolSize是Integer的最大值,也就是maximumPool是无界的;keepAliveTime设置成60秒意味着CacheThreadPool中空闲的线程等待任务最长的时间是60秒,超过60秒该线程就会被终止 CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。 Callable/Future的使用以及原理线程池执行任务有两种方法,一种是execute,另一种是submit;这两个方法的区别如下: execute execute只可以接收一个Runnable的参数 execute如果出现异常会抛出 execute没有返回值
submit submit 可以接收Runnable和Callable两种类型的参数 对于submit方法传入一个callable参数,可以得到一个Future的返回值 submit 方法不会抛出异常,除非调用Future.get()方法
Callable/Future案例 public class CallableDemo implements Callable<String> { @Override public String call() throws Exception { return "Hello"; } public static void main(String[] args) throws InterruptedException, ExecutionException { CallableDemo callableDemo = new CallableDemo(); FutureTask futureTask = new FutureTask(callableDemo); new Thread(futureTask).start(); System.out.println(futureTask.get()); } }
为什么要使用回调呢?因为结果值是由另外一个线程计算的,当前线程不知道结果什么时候能够计算完成,所以传递一个函数给计算线程,当计算结束时候调用这个回调接口,回传结果值。这种方式在很多地方都有用到,比如Dubbo的异步调用、消息中间件的异步通信等等。 接下来看看Callable/Future是如何实现的,在刚刚的例子中我们用到了两个api,分别是Callable和FutureTask;Callable是一个函数式接口(一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口;可以被隐式转换为 lambda 表达式),里面只有一个call方法: @FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
FutureTask实现了RunnableFuture接口;RunnableFuture接口继承了Runnable和Future这两个接口: public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
Future接口表示一个任务的生命周期,并提供了相应方法来判断任务是否已经完成、取消以及获取任务结果等: public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
到这里我们可以知道,FutureTask就是Runnable和Future的结合,如果把Runnable比作生产者,Future就是消费者,那FutureTask就是被这两者共享的;生产者通过run方法计算结果,消费者通过get方法获取结果。 作为生产者消费者模式,有一个很重要的机制就是如果生产者的数据还没有产生,消费者会被阻塞;当生产者数据准备好了以后会唤醒消费者继续执行。那么在FutureTask里面是基于什么方法实现的呢? 通过源码我们可以看到,在FutureTask中通过变量state来维护状态,共用一下7中状态: public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state; private static final int NEW = 0; // 新建状态,表示这个FutureTask还没有开始运行 private static final int COMPLETING = 1; // 表示FutureTask任务已经完成,但是还有一些后续操作还没有完成,比如唤醒等待线程 private static final int NORMAL = 2; // 任务结束,正常完成,没有异常 private static final int EXCEPTIONAL = 3; // 因为发生了异常,任务结束 private static final int CANCELLED = 4; // 因为取消了任务,任务结束 private static final int INTERRUPTING = 5; // 任务结束,也是由于取消了任务,但是发起了中断运行任务线程的中断请求 private static final int INTERRUPTED = 6; // 任务结束,也是由于取消了任务,完成了中断运行任务线程的中断请求 .....// 省略部分代码 }
run方法 run方法其实很简单,就是调用Callable的call方法返回结果值result,根据是否发生异常调用set(result)或者setException(ex);不过因为FutureTask任务都是在多线程环境下运行,所以需要注意并发冲突问题。在run方法中,并没有使用synchronized代码块或者lock来解决并发问题,而是通过CAS乐观锁来实现并发,确保只有一个线程能运行FutureTask任务。 public void run() { // 如果状态不是New,或者设置runner的值失败,说明有其它线程已经调用了run方法并成功设置了runner的值,直接return // 确保了只有一个线程可以运行try代码块中的代码 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { // 只有c不为null且状态为New的时候,才可以执行 V result; boolean ran; try { result = c.call(); // 调用Callable的call方法,并得到返回值 ran = true; // 设置运行成功标识 } catch (Throwable ex) { result = null; ran = false; // 如果发生异常,设置运行成功标识位false setException(ex); // 设置异常结果 } if (ran) set(result); // 如果成功,把结果赋值给outCome变量 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
get方法 get方法就是通过阻塞获取线程结果,主要做了两件事: 判断状态如果小于等于COMPLETING,表示FutureTask任务还没有执行完,则调用awaitDone方法等待 report返回结果值,或者抛出异常
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
awaitDone方法 如果当前的任务还没有被执行完,则将当前线程加入到等待队列中,等到run方法执行完之后会唤醒继续执行: private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 标识节点是否已经添加到等待队列 for (;;) { if (Thread.interrupted()) { // 如果当前线程中断标志位为true,则从列表中移除节点,并且抛出exception removeWaiter(q); throw new InterruptedException(); }
int s = state; if (s > COMPLETING) { // 状态大于COMPLETING,标识FutureTask已经结束 if (q != null) q.thread = null; // 将节点线程设置成null, 因为线程没有阻塞等待 return s; } else if (s == COMPLETING) // 任务已经完成,但是还有一些后续操作没有完成,当前线程让出执行权 Thread.yield(); else if (q == null) // 标识state为New,需要将当前线程阻塞等待, 创建waitNode节点 q = new WaitNode(); else if (!queued) // 通过CAS将q添加到等到队列,如果失败则queued为false,下次循环会继续添加,知道成功 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // timed为true表示要设置超时 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); // 让当前线程等待nanos时间 } else LockSupport.park(this); } }
report方法 report方法就是根据传入的状态值判断是返回结果还是抛出异常: private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) // 大于等于CANCELLED,表示手动去掉了FutureTask throw new CancellationException(); throw new ExecutionException((Throwable)x); }
|