前言介绍 分析Promise之前我们先来看两个单词;Promise、Future
Promise v. 许诺;承诺;答应;保证;使很可能;预示 Future n. 将来;未来;未来的事;将来发生的事;前景;前途;前程
他们的含义都是对未来即将要发生的事情做相应的处理,这也是在异步编程中非常常见的类名。
Netty是一个异步网络处理框架,在实现中大量使用了Future机制,并在Java自带Future的基础上,增加了Promise机制。这两个实现类的目的都是为了使异步编程更加方便使用。
源码分析 1、了解Java并发包中的Future java的并发包中提供java.util.concurrent.Future类,用于处理异步操作。在Java中Future是一个未来完成的异步操作,可以获得未来返回的值。如下案例,调用一个获取用户信息的方法,该方法会立刻返回Future对象,调用Future.get()可以同步等待耗时方法的返回,也可以通过调用future的cancel()取消Future任务。
1 class TestFuture { 2 3 public static void main (String[] args) throws ExecutionException, InterruptedException { 4 TestFuture testFuture = new TestFuture(); 5 Future<String> future = testFuture.queryUserInfo("10001" ); //返回future 6 String userInfo = future.get(); 7 System.out.println("查询用户信息:" + userInfo); 8 } 9 10 private Future<String> queryUserInfo (String userId) {11 FutureTask<String> future = new FutureTask<>(() -> {12 try {13 Thread.sleep(1000 );14 return "微信公众号:bugstack虫洞栈 | 用户ID:" + userId;15 } catch (InterruptedException ignored) {}16 return "error" ;17 });18 new Thread(future).start();19 return future;20 }21 22 }
2、Netty实现了自己的Future Netty通过继承java并发包的Future来定义自己的Future接口,为Future加入的功能主要有添加、删除监听事件接口,最后由Promise实现。
io.netty.util.concurrent.Future.java中定义了一些列的异步编程方法 | 经常会使用的>b.bind(port).sync();
1 // 只有IO操作完成时才返回true 2 boolean isSuccess () ; 3 // 只有当cancel(boolean)成功取消时才返回true 4 boolean isCancellable () ; 5 // IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回null 6 Throwable cause () ; 7 // 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件 8 Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; 9 Future<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ;10 // 移除监听事件,future完成时,不会触发 11 Future<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ;12 Future<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ;13 // 等待future done 14 Future<V> sync () throws InterruptedException ;15 // 等待future done,不可打断 16 Future<V> syncUninterruptibly () ;17 // 等待future完成 18 Future<V> await () throws InterruptedException ;19 // 等待future 完成,不可打断 20 Future<V> awaitUninterruptibly () ;21 boolean await (long timeout, TimeUnit unit) throws InterruptedException ;22 boolean await (long timeoutMillis) throws InterruptedException ;23 boolean awaitUninterruptibly (long timeout, TimeUnit unit) ;24 boolean awaitUninterruptibly (long timeoutMillis) ;25 // 立刻获得结果,如果没有完成,返回null 26 V getNow () ;27 // 如果成功取消,future会失败,导致CancellationException 28 @Override 29 boolean cancel (boolean mayInterruptIfRunning) ;
3、Promise机制 Netty的Future与Java的Future虽然类名相同,但功能上略有不同,Netty中引入了Promise机制。在Java的Future中,业务逻辑为一个Callable或Runnable实现类,该类的call()或run()执行完毕意味着业务逻辑的完结;而在Promise机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
io.netty.util.concurrent.Promise.java |
1 public interface Promise <V > extends Future <V > { 2 3 // 设置future执行结果为成功 4 Promise<V> setSuccess (V result) ; 5 6 // 尝试设置future执行结果为成功,返回是否设置成功 7 boolean trySuccess (V result) ; 8 9 // 设置失败 10 Promise<V> setFailure (Throwable cause) ;11 12 // 尝试设置future执行结果为失败,返回是否设置成功 13 boolean tryFailure (Throwable cause) ;14 15 // 设置为不能取消 16 boolean setUncancellable () ;17 18 // 源码中,以下为覆盖了Future的方法,例如; 19 20 Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ;21 22 @Override 23 Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ;24 25 }
TestPromise.java | 一个查询用户信息的Promise列子,加入监听再operationComplete完成后,获取查询信息
1 class TestPromise { 2 3 public static void main (String[] args) throws ExecutionException, InterruptedException { 4 TestPromise testPromise = new TestPromise(); 5 Promise<String> promise = testPromise.queryUserInfo("10001" ); 6 promise.addListener(new GenericFutureListener<Future<? super String>>() { 7 @Override 8 public void operationComplete (Future<? super String> future) throws Exception { 9 System.out.println("addListener.operationComplete > 查询用户信息完成: " + future.get());10 }11 });12 }13 14 private Promise<String> queryUserInfo (String userId) {15 NioEventLoopGroup loop = new NioEventLoopGroup();16 // 创建一个DefaultPromise并返回,将业务逻辑放入线程池中执行 17 DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());18 loop.schedule(() -> {19 try {20 Thread.sleep(1000 );21 promise.setSuccess("微信公众号:bugstack虫洞栈 | 用户ID:" + userId);22 return promise;23 } catch (InterruptedException ignored) {24 }25 return promise;26 }, 0 , TimeUnit.SECONDS);27 return promise;28 }29 30 }
通过这个例子可以看到,Promise能够在业务逻辑线程中通知Future成功或失败,由于Promise继承了Netty的Future,因此可以加入监听事件。而Future和Promise的好处在于,获取到Promise对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。
4、Promise类组织结构&常用方法 DefaultChannelPromise类组织结构图 | 承接Java并发包Future并增强实现
微信公众号:bugstack虫洞栈 | DefaultChannelPromise类组织结构图 Netty中DefalutPromise是一个非常常用的类,这是Promise实现的基础。DefaultChannelPromise是DefalutPromise的子类,加入了channel这个属性。
DefaultPromise | 使用 在Netty中使用到Promise的地方会非常多,例如在前面一节《一行简单的writeAndFlush都做了哪些事》分析HeadContext.write中unsafe.write(msg, promise);结合这一章节可以继续深入了解Netty的异步框架原理。另外,服务器/客户端启动时的注册任务,最终会调用unsafe的register,调用过程中会传入一个promise,unsafe进行事件的注册时调用promise可以设置成功/失败。
SingleThreadEventLoop.java | 注册服务事件循环组
1 @Override 2 public ChannelFuture register (Channel channel) { 3 return register(new DefaultChannelPromise(channel, this )); 4 } 5 6 @Override 7 public ChannelFuture register (final ChannelPromise promise) { 8 ObjectUtil.checkNotNull(promise, "promise" ); 9 promise.channel().unsafe().register(this , promise);10 return promise;11 }
DefaultPromise | 实现 DefaultChannelPromise提供的功能可以分为两个部分;
AbstractFuture.java | get()方法
1 public abstract class AbstractFuture <V > implements Future <V > { 2 3 @Override 4 public V get () throws InterruptedException, ExecutionException { 5 await(); 6 7 Throwable cause = cause(); 8 if (cause == null ) { 9 return getNow();10 }11 if (cause instanceof CancellationException) {12 throw (CancellationException) cause;13 }14 throw new ExecutionException(cause);15 }16 17 @Override 18 public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {19 if (await(timeout, unit)) {20 Throwable cause = cause();21 if (cause == null ) {22 return getNow();23 }24 if (cause instanceof CancellationException) {25 throw (CancellationException) cause;26 }27 throw new ExecutionException(cause);28 }29 throw new TimeoutException();30 }31 }
DefaultPromise父类AbstractFuture提供了两个get方法;1、无参数的get会阻塞等待;2、有参数的get会等待指定事件,若未结束抛出超时异常。
DefaultPromise.java | DefaultPromise.await()方法
1 @Override 2 public Promise<V> await () throws Interrupt 3 // 判断Future任务是否结束,内部根据result是否为null判断,setSuccess或setFailure时会通过CAS修改result 4 if (isDone() ) { 5 return this ; 6 } 7 // 线程是否被中断 8 if (Thread.interrupted()) { 9 throw new InterruptedException(toS10 }11 // 检查当前线程是否与线程池运行的线程是一个 12 checkDeadLock();13 synchronized (this ) {14 while (!isDone()) {15 /* waiters计数加116 * private void incWaiters() {17 * if (waiters == Short.MAX_VALUE) {18 * throw new IllegalStateException("too many waiters: " + this);19 * }20 * ++waiters;21 * }22 */ 23 incWaiters();24 try {25 // Object的方法,让出CPU,加入等待队列 26 wait();27 } finally {28 // waiters计数减1 29 decWaiters();30 }31 }32 }33 return this ;34 }
await(long timeout, TimeUnit unit)与awite类似,只是调用了Object对象的wait(long timeout, int nanos)方法awaitUninterruptibly()方法在内部catch住了等待线程的中断异常,因此不会抛出中断异常。
DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()
1 private void addListener0 (GenericFutureListener<? extends Future<? super V>> listener) { 2 if (listeners == null ) { 3 listeners = listener; 4 } else if (listeners instanceof DefaultFutureListeners) { 5 ((DefaultFutureListeners) listeners).add(listener); 6 } else { 7 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); 8 } 9 }10 private void removeListener0 (GenericFutureListener<? extends Future<? super V>> listener) {11 if (listeners instanceof DefaultFutureListeners) {12 ((DefaultFutureListeners) listeners).remove(listener);13 } else if (listeners == listener) {14 listeners = null ;15 }16 }
DefaultPromise.java | DefaultPromise.notifyListener0() 通知侦听器
1 @SuppressWarnings ({ "unchecked" , "rawtypes" }) 2 private static void notifyListener0 (Future future, GenericFutureListener l) { 3 try { 4 l.operationComplete(future); 5 } catch (Throwable t) { 6 if (logger.isWarnEnabled()) { 7 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); 8 } 9 }10 }
在添加监听器时,如果任务刚好执行完毕,则会立即触发监听事件,触发监听通过notifyListeners()实现。
addListener和setSuccess都会调用notifyListeners()和Promise内的线程池当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main线程中调用addListener时任务完成,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行Future任务的线程池中setSuccess()时调用notifyListeners(),会放在当前线程中执行。
内部维护了notifyingListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次便利并调用operationComplete
DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 唤起等待线程通知成功/失败
1 // 设置成功后唤醒等待线程 2 private boolean setSuccess0 (V result) { 3 return setValue0(result == null ? SUCCESS : result); 4 } 5 6 // 设置成功后唤醒等待线程 7 private boolean setFailure0 (Throwable cause) { 8 return setValue0(new CauseHolder(checkNotNull(cause, "cause" ))); 9 }10 11 // 通知成功时将结果保存在变量result,通知失败时,使用CauseHolder包装Throwable赋值给result 12 // RESULT_UPDATER 是一个使用CAS更新内部属性result的类, 13 // 如果result为null或UNCANCELLABLE,更新为成功/失败结果;UNCANCELLABLE是不可取消状态 14 private boolean setValue0 (Object objResult) {15 if (RESULT_UPDATER.compareAndSet(this , null , objResult) ||16 RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) {17 // 检查是否有服务,如果有,通知他们。 18 if (checkNotifyWaiters()) {19 notifyListeners(); // 通知 20 }21 return true ;22 }23 return false ;24 }
Future任务在执行完成后调用setSuccess()或setFailure()通知Future执行结果;主要逻辑是:修改result的值,若有等待线程则唤醒,通知监听事件。
DefaultChannelPromise实现
1 /** 2 * The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create 3 * a new {@link ChannelPromise} rather than calling the constructor explicitly. 4 */ 5 public class DefaultChannelPromise extends DefaultPromise <Void > implements ChannelPromise , FlushCheckpoint { 6 7 private final Channel channel; 8 private long checkpoint; 9 10 ...11 }
1 interface FlushCheckpoint {2 long flushCheckpoint () ;3 void flushCheckpoint (long checkpoint) 4 ChannelPromise promise () ;5 }