Guava中Futures
- transform:对于ListenableFuture的返回值进行转换。
- allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
- successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
- immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
- makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
- JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。
- addCallBack为Future增加回调
package com.zte.sunquan.demo.future;
import akka.actor.AbstractActor;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
* Created by sunquan on 2017/9/30.
public class ReturnActor extends AbstractActor {
@GuardedBy("DATE_FORMAT")
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public static Props props() {
return Props.create(ReturnActor.class);
public Receive createReceive() {
return receiveBuilder().match(String.class, s -> {
System.out.println("Got String:" + s + ", at:" + DATE_FORMAT.format(new Date()));
getContext().stop(getSelf());
getContext().actorOf(ReturnActor.props(),"print");
// TimeUnit.SECONDS.sleep(5);
// getSender().tell(28,getSelf());
package com.zte.sunquan.demo.future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import scala.concurrent.Future;
* Created by sunquan on 2018/3/28.
public class FutureAsListTest {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture future1 = null;
ListenableFuture future2 = null;
static ActorSystem system = ActorSystem.create("mysystem");
future1 = service.submit(new Callable<Integer>() {
public Integer call() throws InterruptedException {
System.out.println("call future 1.");
future2 = service.submit(new Callable<Integer>() {
public Integer call() throws InterruptedException {
System.out.println("call future 2.");
// throw new RuntimeException("----call future 2.");
public void testAllAsList() throws Exception {
final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future2);
Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
public void onSuccess(@Nullable List<Integer> result) {
System.out.println("output: " + p);
public void onFailure(Throwable t) {
System.out.println("Output error " + t);
System.out.println(allFutures.get());
public void testAllAsListWithException() throws Exception {
ListenableFuture future3 = service.submit(new Callable<Integer>() {
public Integer call() throws InterruptedException {
System.out.println("call future 3.");
throw new RuntimeException("----call future 3 error.");
final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future3);
Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
public void onSuccess(@Nullable List<Integer> result) {
System.out.println("Output: " + r);
public void onFailure(Throwable t) {
System.out.println("Output error " + t);
System.out.println(allFutures.get());
// Output error java.lang.RuntimeException: ----call future 3 error.
// future3抛出异常,针对allAsList,直接走Failure流程,不管其它feture有没有正确完成
// java.util.concurrent.ExecutionException: java.lang.RuntimeException: ----call future 3 errror.
public void testAllAsListWithException2() throws Exception {
final SettableFuture<Object> settableFuture = SettableFuture.create();
ActorRef print = system.actorOf(ReturnActor.props(), "print");
//注意此处2秒超时,如果这个ask超时,下面的聚合后,还是会走onFailure
Future<Object> ask = Patterns.ask(print, "how old are you", 2000);
ask.onComplete(new OnComplete<Object>() {
public void onComplete(Throwable failure, Object o) throws Throwable {
settableFuture.setException(failure);
final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, settableFuture);
Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
public void onSuccess(@Nullable List<Integer> result) {
System.out.println("Output: " + r);
public void onFailure(Throwable t) {
System.out.println("Output error " + t);
System.out.println(allFutures.get());
public void testSuccessfulAsListWithException() throws Exception {
ListenableFuture future4 = service.submit(new Callable<Integer>() {
public Integer call() throws InterruptedException {
System.out.println("call future 4.");
throw new RuntimeException("----call future 4.");
final ListenableFuture<List<Integer>> allFutures = Futures.successfulAsList(future1, future4);
Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
public void onSuccess(@Nullable List<Integer> result) {
System.out.println("Output: " + r);
public void onFailure(Throwable t) {
System.out.println("Output error " + t);
System.out.println(allFutures.get());
// 对于future4抛出异常,其返回值为null,聚合Future不会走failure流程
public void should_test_furture() throws Exception {
final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future2);
Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
public void onSuccess(@Nullable List<Integer> result) {
System.out.println("Output: " + r);
public void onFailure(Throwable t) {
System.out.println("Output error " + t);
//Futures transform可以进一步重定义结果
final ListenableFuture transform = Futures.transform(allFutures, new AsyncFunction<List<Integer>, Boolean>() {
public ListenableFuture apply(List<Integer> results) throws Exception {
return Futures.immediateFuture(true);
return Futures.immediateFuture(false);
Futures.addCallback(transform, new FutureCallback<Object>() {
public void onSuccess(Object result) {
System.out.println("success with: " + result);
public void onFailure(Throwable thrown) {
System.out.printf("onFailure%s\n", thrown.getMessage());
System.out.println(transform.get());