分享

Executor框架

 新用户73336046 2023-11-20 发布于浙江

Executor框架包含的主要的类与接口如下:

  1. Executor:是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

  2. ThreadPoolExecutor:线程池的核心实现类,用来执行被提交的任务。

  3. ScheduledThreadPoolExecutor:可以在给定的延迟后运行命令,或者定期执 行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

  4. Future和FutureTask:异步计算的结果

  5. Runnable和Callable:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一 个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

然后可以把Runnable对象直接交给ExecutorService执行ExecutorService.execute(Runnable command);或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task)。

如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象FutureTask。由于FutureTask实现了Runnable,程序员也可 以创建FutureTask,然后直接交给ExecutorService执行。

最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

Executor框架的成员

Executor框架主要包括以下成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。

ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池,下面是FixedThreadPool的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {  return new ThreadPoolExecutor(nThreads, nThreads,                  0L, TimeUnit.MILLISECONDS,                  new LinkedBlockingQueue<Runnable>());}

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指 定的参数nThreads。
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。FixedThreadPool的execute()方法的运行示意图如下:

说明如下:

  1. 如果当前运行的线程数小于corePoolSize,则创建新线程来执行任务

  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue

  3. 当线程执行完1中的任务后,会在循环中反复从linkedBlockingQueue中获取任务来执行

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE),使用无界队列作为工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到corePoolSize之后,新任务将在无界队列中等待,因此线程池中的线程永远不会超过corePoolSize

  2. 由于1, 使用无界队列时maximumPoolSize将是一个无效参数

  3. 由于1和2, 使用无界队列时keepAliveTime将是一个无效参数

  4. 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务,不会调用RejectedExecutionHandler.rejectedExecution方法


SingleThreadExecutor
SingleThreadExecutor是使用单个worker线程的Executor:

public static ExecutorService newSingleThreadExecutor() {  return new FinalizableDelegatedExecutorService    (new ThreadPoolExecutor(1, 1,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>()));}

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工 作队列(队列的容量为Integer.MAX_VALUE);SingleThreadExecutor使用无界队列作为工作队列 对线程池带来的影响与FixedThreadPool相同

说明如下:

  1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务

  2. 在线程池完成预热后(当前线程池中有一个运行的线程),将任务加入到LinkedBlockingQueue

  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue中获取任务来执行


CachedThreadPool
CachedThreadPool是一个会根据需要创建新线程的线程池:

public static ExecutorService newCachedThreadPool() {  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                  60L, TimeUnit.SECONDS,                  new SynchronousQueue<Runnable>());}

CacheThreadPool的corePoolSize的值为0,即corePool为空;maximumPoolSize是Integer的最大值,也就是maximumPool是无界的;keepAliveTime设置成60秒意味着CacheThreadPool中空闲的线程等待任务最长的时间是60秒,超过60秒该线程就会被终止

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,
CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

Callable/Future的使用以及原理

线程池执行任务有两种方法,一种是execute,另一种是submit;这两个方法的区别如下:
execute

  1. execute只可以接收一个Runnable的参数

  2. execute如果出现异常会抛出

  3. execute没有返回值

submit

  1. submit 可以接收Runnable和Callable两种类型的参数

  2. 对于submit方法传入一个callable参数,可以得到一个Future的返回值

  3. submit 方法不会抛出异常,除非调用Future.get()方法

Callable/Future案例

public class CallableDemo implements Callable<String> {  @Override  public String call() throws Exception {    return "Hello";  }  public static void main(String[] args) throws InterruptedException, ExecutionException {    CallableDemo callableDemo = new CallableDemo();    FutureTask futureTask = new FutureTask(callableDemo);    new Thread(futureTask).start();    System.out.println(futureTask.get());  }}

为什么要使用回调呢?因为结果值是由另外一个线程计算的,当前线程不知道结果什么时候能够计算完成,所以传递一个函数给计算线程,当计算结束时候调用这个回调接口,回传结果值。这种方式在很多地方都有用到,比如Dubbo的异步调用、消息中间件的异步通信等等。

接下来看看Callable/Future是如何实现的,在刚刚的例子中我们用到了两个api,分别是Callable和FutureTask;Callable是一个函数式接口(一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口;可以被隐式转换为 lambda 表达式),里面只有一个call方法:

@FunctionalInterfacepublic interface Callable<V> {    /**     * Computes a result, or throws an exception if unable to do so.     *     * @return computed result     * @throws Exception if unable to compute a result     */    V call() throws Exception;}

FutureTask实现了RunnableFuture接口;RunnableFuture接口继承了Runnable和Future这两个接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {    /**     * Sets this Future to the result of its computation     * unless it has been cancelled.     */    void run();}

Future接口表示一个任务的生命周期,并提供了相应方法来判断任务是否已经完成、取消以及获取任务结果等:

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

到这里我们可以知道,FutureTask就是Runnable和Future的结合,如果把Runnable比作生产者,Future就是消费者,那FutureTask就是被这两者共享的;生产者通过run方法计算结果,消费者通过get方法获取结果。

作为生产者消费者模式,有一个很重要的机制就是如果生产者的数据还没有产生,消费者会被阻塞;当生产者数据准备好了以后会唤醒消费者继续执行。那么在FutureTask里面是基于什么方法实现的呢?

通过源码我们可以看到,在FutureTask中通过变量state来维护状态,共用一下7中状态:

public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state; private static final int NEW = 0; // 新建状态,表示这个FutureTask还没有开始运行 private static final int COMPLETING = 1; // 表示FutureTask任务已经完成,但是还有一些后续操作还没有完成,比如唤醒等待线程 private static final int NORMAL = 2; // 任务结束,正常完成,没有异常 private static final int EXCEPTIONAL = 3; // 因为发生了异常,任务结束 private static final int CANCELLED = 4; // 因为取消了任务,任务结束 private static final int INTERRUPTING = 5; // 任务结束,也是由于取消了任务,但是发起了中断运行任务线程的中断请求 private static final int INTERRUPTED = 6; // 任务结束,也是由于取消了任务,完成了中断运行任务线程的中断请求 .....// 省略部分代码}

run方法
run方法其实很简单,就是调用Callable的call方法返回结果值result,根据是否发生异常调用set(result)或者setException(ex);不过因为FutureTask任务都是在多线程环境下运行,所以需要注意并发冲突问题。在run方法中,并没有使用synchronized代码块或者lock来解决并发问题,而是通过CAS乐观锁来实现并发,确保只有一个线程能运行FutureTask任务。

public void run() {  // 如果状态不是New,或者设置runner的值失败,说明有其它线程已经调用了run方法并成功设置了runner的值,直接return  // 确保了只有一个线程可以运行try代码块中的代码  if (state != NEW ||    !UNSAFE.compareAndSwapObject(this, runnerOffset,                   null, Thread.currentThread()))    return;  try {    Callable<V> c = callable;    if (c != null && state == NEW) { // 只有c不为null且状态为New的时候,才可以执行      V result;      boolean ran;      try {        result = c.call(); // 调用Callable的call方法,并得到返回值        ran = true; // 设置运行成功标识      } catch (Throwable ex) {        result = null;        ran = false; // 如果发生异常,设置运行成功标识位false        setException(ex); // 设置异常结果      }      if (ran)        set(result);  // 如果成功,把结果赋值给outCome变量    }  } finally {    runner = null;    int s = state;    if (s >= INTERRUPTING)      handlePossibleCancellationInterrupt(s);  }}
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}

get方法
get方法就是通过阻塞获取线程结果,主要做了两件事:

  1. 判断状态如果小于等于COMPLETING,表示FutureTask任务还没有执行完,则调用awaitDone方法等待

  2. report返回结果值,或者抛出异常

public V get() throws InterruptedException, ExecutionException {  int s = state;  if (s <= COMPLETING)    s = awaitDone(false, 0L);  return report(s);}

awaitDone方法
如果当前的任务还没有被执行完,则将当前线程加入到等待队列中,等到run方法执行完之后会唤醒继续执行:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {  final long deadline = timed ? System.nanoTime() + nanos : 0L;  WaitNode q = null;  boolean queued = false; // 标识节点是否已经添加到等待队列  for (;;) {    if (Thread.interrupted()) {      // 如果当前线程中断标志位为true,则从列表中移除节点,并且抛出exception      removeWaiter(q);      throw new InterruptedException();    }
int s = state; if (s > COMPLETING) { // 状态大于COMPLETING,标识FutureTask已经结束 if (q != null) q.thread = null; // 将节点线程设置成null, 因为线程没有阻塞等待 return s; } else if (s == COMPLETING) // 任务已经完成,但是还有一些后续操作没有完成,当前线程让出执行权 Thread.yield(); else if (q == null) // 标识state为New,需要将当前线程阻塞等待, 创建waitNode节点 q = new WaitNode(); else if (!queued) // 通过CAS将q添加到等到队列,如果失败则queued为false,下次循环会继续添加,知道成功 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // timed为true表示要设置超时 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); // 让当前线程等待nanos时间 } else LockSupport.park(this); }}

report方法
report方法就是根据传入的状态值判断是返回结果还是抛出异常:

private V report(int s) throws ExecutionException {  Object x = outcome;  if (s == NORMAL)    return (V)x;  if (s >= CANCELLED) // 大于等于CANCELLED,表示手动去掉了FutureTask    throw new CancellationException();  throw new ExecutionException((Throwable)x);}

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多