配色: 字号:
Java多线程之异步Future机制的原理和实现
2016-12-22 | 阅:  转:  |  分享 
  
Java多线程之异步Future机制的原理和实现



这篇文章主要为大家详细介绍了Java多线程之异步Future机制的原理和实现,感兴趣的小伙伴们可以参考一下



项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:



importjava.util.concurrent.Callable;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;publicclassAddTaskimplementsCallable{privateinta,b;publicAddTask(inta,intb){this.a=a;this.b=b;}@OverridepublicIntegercallthrowsException{Integerresult=a+b;returnresult;}publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ExecutorServiceexecutor=Executors.newSingleThreadExecutor;//JDK目前为止返回的都是FutureTask的实例Futurefuture=executor.submit(newAddTask(1,2));Integerresult=future.get;//只有当future的状态是已完成时(future.isDone=true),get方法才会返回}}



虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future的接口方法:



publicinterfaceFuture{booleancancel(booleanmayInterruptIfRunning);booleanisCancelled;booleanisDone;VgetthrowsInterruptedException,ExecutionException;Vget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}



由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:



packagefuture;importjava.util.concurrent.CancellationException;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;/Theresultofanasynchronousoperation.@authorlixiaohui@param执行结果的类型参数/publicinterfaceIFutureextendsFuture{booleanisSuccess;//是否成功VgetNow;//立即返回结果(不管Future是否处于完成状态)Throwablecause;//若执行失败时的原因booleanisCancellable;//是否可以取消IFutureawaitthrowsInterruptedException;//等待future的完成booleanawait(longtimeoutMillis)throwsInterruptedException;//超时等待future的完成booleanawait(longtimeout,TimeUnittimeunit)throwsInterruptedException;IFutureawaitUninterruptibly;//等待future的完成,不响应中断booleanawaitUninterruptibly(longtimeoutMillis);//超时等待future的完成,不响应中断booleanawaitUninterruptibly(longtimeout,TimeUnittimeunit);IFutureaddListener(IFutureListenerl);//当future完成时,会通知这些加进来的监听器IFutureremoveListener(IFutureListenerl);}



接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原???的核心就是这两个方法.看看JDK里面的解释:



publicclassObject{/Causesthecurrentthreadtowaituntilanotherthreadinvokesthe{@linkjava.lang.Object#notify}methodorthe{@linkjava.lang.Object#notifyAll}methodforthisobject.Inotherwords,thismethodbehavesexactlyasifitsimplyperformsthecall{@codewait(0)}.调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify/notifyAll/publicfinalvoidwaitthrowsInterruptedException{wait(0);}/Wakesupallthreadsthatarewaitingonthisobject''smonitor.Athreadwaitsonanobject''smonitorbycallingoneofthe{@codewait}methods.

Theawakenedthreadswillnotbeabletoproceeduntilthecurrentthreadrelinquishesthelockonthisobject.Theawakenedthreadswillcompeteintheusualmannerwithanyotherthreadsthatmightbeactivelycompetingtosynchronizeonthisobject;forexample,theawakenedthreadsenjoynoreliableprivilegeordisadvantageinbeingthenextthreadtolockthisobject./publicfinalnativevoidnotifyAll;}



知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):



packagefuture;importjava.util.Collection;importjava.util.concurrent.CancellationException;importjava.util.concurrent.CopyOnWriteArrayList;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;/

正常结束时,若执行的结果不为null,则result为执行结果;若执行结果为null,则result={@linkAbstractFuture#SUCCESS_SIGNAL}异常结束时,result为{@linkCauseHolder}的实例;若是被取消而导致的异常结束,则result为{@linkCancellationException}的实例,否则为其它异常的实例以下情况会使异步操作由未完成状态转至已完成状态,也就是在以下情况发生时调用notifyAll方法:
  • 异步操作被取消时(cancel方法)
  • 异步操作正常结束时(setSuccess方法)
  • 异步操作异常结束时(setFailure方法)
@authorlixiaohui@param异步执行结果的类型/publicclassAbstractFutureimplementsIFuture{protectedvolatileObjectresult;//需要保证其可见性/监听器集/protectedCollection>listeners=newCopyOnWriteArrayList>;/当任务正常执行结果为null时,即客户端调用{@linkAbstractFuture#setSuccess(null)}时,result引用该对象/privatestaticfinalSuccessSignalSUCCESS_SIGNAL=newSuccessSignal;@Overridepublicbooleancancel(booleanmayInterruptIfRunning){if(isDone){//已完成了不能取消returnfalse;}synchronized(this){if(isDone){//doublecheckreturnfalse;}result=newCauseHolder(newCancellationException);notifyAll;//isDone=true,通知等待在该对象的wait的线程}notifyListeners;//通知监听器该异步操作已完成returntrue;}@OverridepublicbooleanisCancellable{returnresult==null;}@OverridepublicbooleanisCancelled{returnresult!=null&&resultinstanceofCauseHolder&&((CauseHolder)result)www.hunanwang.net.causeinstanceofCancellationException;}@OverridepublicbooleanisDone{returnresult!=null;}@OverridepublicVgetthrowsInterruptedException,ExecutionException{await;//等待执行结果Throwablecause=cause;if(cause==null){//没有发生异常,异步操作正常结束returngetNow;}if(causeinstanceofCancellationException){//异步操作被取消了throw(CancellationException)cause;}thrownewExecutionException(cause);//其他异常}@OverridepublicVget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{if(await(timeout,unit)){//超时等待执行结果Throwablecause=cause;if(cause==null){//没有发生异常,异步操作正常结束returngetNow;}if(causeinstanceofCancellationException){//异步操作被取消了throw(CancellationException)cause;}thrownewExecutionException(cause);//其他异常}//时间到了异步操作还没有结束,抛出超时异常thrownewTimeoutException;}@OverridepublicbooleanisSuccess{returnresult==null?false:!(resultinstanceofCauseHolder);}@SuppressWarnings("unchecked")@OverridepublicVgetNow{return(V)(result==SUCCESS_SIGNAL?null:result);}@OverridepublicThrowablecause{if(result!=null&&resultinstanceofCauseHolder){return((CauseHolder)result).cause;}returnnull;}@OverridepublicIFutureaddListener(IFutureListenerlistener){if(listener==null){thrownewNullPointerException("listener");}if(isDone){//若已完成直接通知该监听器notifyListener(listener);returnthis;}synchronized(this){if(!isDone){listeners.add(listener);returnthis;}}notifyListener(listener);returnthis;}@OverridepublicIFutureremoveListener(IFutureListenerlistener){if(listener==null){thrownewNullPointerException("listener");}if(!isDone){listeners.remove(listener);}returnthis;}@OverridepublicIFutureawaitthrowsInterruptedException{returnawait0(true);}privateIFutureawait0(booleaninterruptable)throwsInterruptedException{if(!isDone){//若已完成就直接返回了//若允许终端且被中断了则抛出中断异常if(interruptable&&Thread.interrupted){thrownewInterruptedException("thread"+Thread.currentThread.getName+"hasbeeninterrupted.");}booleaninterrupted=false;synchronized(this){while(!isDone){try{wait;//释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法}catch(InterruptedExceptione){if(interruptable){throwe;}else{interrupted=true;}}}}if(interrupted){//为什么这里要设中断标志位?因为从wait方法返回后,中断标志是被clear了的,//这里重新设置以便让其它代码知道这里被中断了。Thread.currentThread.interrupt;}}returnthis;}@Overridepublicbooleanawait(longtimeoutMillis)throwsInterruptedException{returnawait0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis),true);}@Overridepublicbooleanawait(longtimeout,TimeUnitunit)throwsInterruptedException{returnawait0(unit.toNanos(timeout),true);}privatebooleanawait0(longtimeoutNanos,booleaninterruptable)throwsInterruptedException{if(isDone){returntrue;}if(timeoutNanos<=0){returnisDone;}if(interruptable&&Thread.interrupted){thrownewInterruptedException(toString);}longstartTime=timeoutNanos<=0?0:System.nanoTime;longwaitTime=timeoutNanos;booleaninterrupted=false;try{synchronized(this){if(isDone){returntrue;}if(waitTime<=0){returnisDone;}for(;;){try{wait(waitTime/1000000,(int)(waitTime%1000000));}catch(InterruptedExceptione){if(interruptable){throwe;}else{interrupted=true;}}if(isDone){returntrue;}else{waitTime=timeoutNanos-(System.nanoTime-startTime);if(waitTime<=0){returnisDone;}}}}}finally{if(interrupted){Thread.currentThread.www.visa158.cominterrupt;}}}@OverridepublicIFutureawaitUninterruptibly{try{returnawait0(false);}catch(InterruptedExceptione){//这里若抛异常了就无法处理了thrownewjava.lang.InternalError;}}@OverridepublicbooleanawaitUninterruptibly(longtimeoutMillis){try{returnawait0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis),false);}catch(InterruptedExceptione){thrownewjava.lang.InternalError;}}@OverridepublicbooleanawaitUninterruptibly(longtimeout,TimeUnitunit){try{returnawait0(unit.toNanos(timeout),false);}catch(InterruptedExceptione){thrownewjava.lang.InternalError;}}protectedIFuturesetFailure(Throwablecause){if(setFailure0(cause)){notifyListeners;returnthis;}thrownewIllegalStateException("completealready:"+this);}privatebooleansetFailure0(Throwablecause){if(isDone){returnfalse;}synchronized(this){if(isDone){returnfalse;}result=newCauseHolder(cause);notifyAll;}returntrue;}protectedIFuturesetSuccess(Objectresult){if(setSuccess0(result)){//设置成功后通知监听器notifyListeners;returnthis;}thrownewIllegalStateException("completealready:"+this);}privatebooleansetSuccess0(Objectresult){if(isDone){returnfalse;}synchronized(this){if(isDone){returnfalse;}if(result==null){//异步操作正常执行完毕的结果是nullthis.result=SUCCESS_SIGNAL;}else{this.result=result;}notifyAll;}returntrue;}privatevoidnotifyListeners{for(IFutureListenerl:listeners){notifyListener(l);}}privatevoidnotifyListener(IFutureListenerl){try{l.operationCompleted(this);}catch(Exceptione){e.printStackTrace;}}privatestaticclassSuccessSignal{}privatestaticfinalclassCauseHolder{finalThrowablecause;CauseHolder(Throwablecause){this.cause=cause;}}}



那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:



packagefuture.test;importfuture.IFuture;importfuture.IFutureListener;/延时加法@authorlixiaohui/publicclassDelayAdder{publicstaticvoidmain(String[]args){newDelayAdder.add(31000,1,2).addListener(newIFutureListener{@OverridepublicvoidoperationCompleted(IFuturefuture)throwsException{System.out.println(future.getNow);}});}/延迟加@paramdelay延时时长milliseconds@parama加数@paramb加数@return异步结果/publicDelayAdditionFutureadd(longdelay,inta,intb){DelayAdditionFuturefuture=newDelayAdditionFuture;newThread(newDelayAdditionTask(delay,a,b,future)).start;returnfuture;}privateclassDelayAdditionTaskimplementsRunnable{privatelongdelay;privateinta,b;privateDelayAdditionFuturefuture;publicDelayAdditionTask(longdelay,inta,intb,DelayAdditionFuturefuture){super;this.delay=delay;this.a=a;this.b=b;this.future=future;}@Overridepublicvoidrun{try{Thread.sleep(delay);Integeri=a+b;//TODO这里设置future为完成状态(正常执行完毕)future.setSuccess(i);}catch(InterruptedExceptione){//TODO这里设置future为完成状态(异常执行完毕)future.setFailure(e.getCause);}}}}packagefuture.test;importfuture.AbstractFuture;importfuture.IFuture;//只是把两个方法对外暴露publicclassDelayAdditionFutureextendsAbstractFuture{@OverridepublicIFuturesetSuccess(Objectresult){returnsuper.setSuccess(result);}@OverridepublicIFuturesetFailure(Throwablecause){returnsuper.setFailure(cause);}}



可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。





















献花(0)
+1
(本文系白狐一梦首藏)