分享

Future之allAsList与successfulAsList

 gideshi 2020-02-04

Future之allAsList与successfulAsList

原创 约定291天后 最后发布于2018-03-30 10:45:36 阅读数 1610 收藏
发布于2018-03-30 10:45:36
分类专栏: Java基础
版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。
Guava中Futures
  1. transform:对于ListenableFuture的返回值进行转换。
  2. allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
  3. successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
  4. immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
  5. makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
  6. JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。
  7. addCallBack为Future增加回调
  1. package com.zte.sunquan.demo.future;
  2. import akka.actor.AbstractActor;
  3. import akka.actor.Props;
  4. import java.text.SimpleDateFormat;
  5. import java.util.Date;
  6. import java.util.concurrent.TimeUnit;
  7. import javax.annotation.concurrent.GuardedBy;
  8. /**
  9. * Created by sunquan on 2017/9/30.
  10. */
  11. public class ReturnActor extends AbstractActor {
  12. @GuardedBy("DATE_FORMAT")
  13. private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  14. public static Props props() {
  15. return Props.create(ReturnActor.class);
  16. }
  17. @Override
  18. public Receive createReceive() {
  19. return receiveBuilder().match(String.class, s -> {
  20. System.out.println("Got String:" + s + ", at:" + DATE_FORMAT.format(new Date()));
  21. getContext().stop(getSelf());
  22. getContext().actorOf(ReturnActor.props(),"print");
  23. // TimeUnit.SECONDS.sleep(5);
  24. // getSender().tell(28,getSelf());
  25. }).build();
  26. }
  27. }
  1. package com.zte.sunquan.demo.future;
  2. import akka.actor.ActorRef;
  3. import akka.actor.ActorSystem;
  4. import akka.dispatch.OnComplete;
  5. import akka.pattern.Patterns;
  6. import com.google.common.util.concurrent.AsyncFunction;
  7. import com.google.common.util.concurrent.FutureCallback;
  8. import com.google.common.util.concurrent.Futures;
  9. import com.google.common.util.concurrent.ListenableFuture;
  10. import com.google.common.util.concurrent.ListeningExecutorService;
  11. import com.google.common.util.concurrent.MoreExecutors;
  12. import com.google.common.util.concurrent.SettableFuture;
  13. import java.util.List;
  14. import java.util.concurrent.Callable;
  15. import java.util.concurrent.Executors;
  16. import javax.annotation.Nullable;
  17. import org.junit.Before;
  18. import org.junit.Test;
  19. import scala.concurrent.Future;
  20. /**
  21. * Created by sunquan on 2018/3/28.
  22. */
  23. public class FutureAsListTest {
  24. ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  25. ListenableFuture future1 = null;
  26. ListenableFuture future2 = null;
  27. static ActorSystem system = ActorSystem.create("mysystem");
  28. @Before
  29. public void init() {
  30. future1 = service.submit(new Callable<Integer>() {
  31. public Integer call() throws InterruptedException {
  32. Thread.sleep(1000);
  33. System.out.println("call future 1.");
  34. return 1;
  35. }
  36. });
  37. future2 = service.submit(new Callable<Integer>() {
  38. public Integer call() throws InterruptedException {
  39. Thread.sleep(1000);
  40. System.out.println("call future 2.");
  41. // throw new RuntimeException("----call future 2.");
  42. return 2;
  43. }
  44. });
  45. }
  46. @Test
  47. public void testAllAsList() throws Exception {
  48. //allAsList
  49. final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future2);
  50. Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
  51. @Override
  52. public void onSuccess(@Nullable List<Integer> result) {
  53. result.forEach(p -> {
  54. System.out.println("output: " + p);
  55. });
  56. }
  57. @Override
  58. public void onFailure(Throwable t) {
  59. System.out.println("Output error " + t);
  60. }
  61. });
  62. System.out.println(allFutures.get());
  63. Thread.sleep(2000);
  64. // call future 2.
  65. // call future 1.
  66. // [1, 2]
  67. // Output: 1
  68. // Output: 2
  69. }
  70. @Test
  71. public void testAllAsListWithException() throws Exception {
  72. future2.cancel(true);
  73. ListenableFuture future3 = service.submit(new Callable<Integer>() {
  74. public Integer call() throws InterruptedException {
  75. Thread.sleep(1000);
  76. System.out.println("call future 3.");
  77. throw new RuntimeException("----call future 3 error.");
  78. }
  79. });
  80. //allAsList
  81. final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future3);
  82. Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
  83. @Override
  84. public void onSuccess(@Nullable List<Integer> result) {
  85. for (Integer r : result)
  86. System.out.println("Output: " + r);
  87. }
  88. @Override
  89. public void onFailure(Throwable t) {
  90. System.out.println("Output error " + t);
  91. }
  92. });
  93. System.out.println(allFutures.get());
  94. // call future 1.
  95. // call future 3.
  96. // Output error java.lang.RuntimeException: ----call future 3 error.
  97. // future3抛出异常,针对allAsList,直接走Failure流程,不管其它feture有没有正确完成
  98. // java.util.concurrent.ExecutionException: java.lang.RuntimeException: ----call future 3 errror.
  99. }
  100. @Test
  101. public void testAllAsListWithException2() throws Exception {
  102. future2.cancel(true);
  103. final SettableFuture<Object> settableFuture = SettableFuture.create();
  104. ActorRef print = system.actorOf(ReturnActor.props(), "print");
  105. //注意此处2秒超时,如果这个ask超时,下面的聚合后,还是会走onFailure
  106. //什么时候,会中断?
  107. Future<Object> ask = Patterns.ask(print, "how old are you", 2000);
  108. ask.onComplete(new OnComplete<Object>() {
  109. @Override
  110. public void onComplete(Throwable failure, Object o) throws Throwable {
  111. if (failure != null) {
  112. settableFuture.setException(failure);
  113. } else
  114. settableFuture.set(o);
  115. }
  116. }, system.dispatcher());
  117. //allAsList
  118. final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, settableFuture);
  119. Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
  120. @Override
  121. public void onSuccess(@Nullable List<Integer> result) {
  122. for (Integer r : result)
  123. System.out.println("Output: " + r);
  124. }
  125. @Override
  126. public void onFailure(Throwable t) {
  127. System.out.println("Output error " + t);
  128. }
  129. });
  130. System.out.println(allFutures.get());
  131. }
  132. @Test
  133. public void testSuccessfulAsListWithException() throws Exception {
  134. future2.cancel(true);
  135. ListenableFuture future4 = service.submit(new Callable<Integer>() {
  136. public Integer call() throws InterruptedException {
  137. Thread.sleep(1000);
  138. System.out.println("call future 4.");
  139. throw new RuntimeException("----call future 4.");
  140. }
  141. });
  142. //successfulAsList
  143. final ListenableFuture<List<Integer>> allFutures = Futures.successfulAsList(future1, future4);
  144. Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
  145. @Override
  146. public void onSuccess(@Nullable List<Integer> result) {
  147. for (Integer r : result)
  148. System.out.println("Output: " + r);
  149. }
  150. @Override
  151. public void onFailure(Throwable t) {
  152. System.out.println("Output error " + t);
  153. }
  154. });
  155. System.out.println(allFutures.get());
  156. // call future 1.
  157. // call future 4.
  158. // 对于future4抛出异常,其返回值为null,聚合Future不会走failure流程
  159. // [1, null]
  160. // Output: 1
  161. // Output: null
  162. }
  163. @Test
  164. public void should_test_furture() throws Exception {
  165. //allAsList
  166. final ListenableFuture<List<Integer>> allFutures = Futures.allAsList(future1, future2);
  167. Futures.addCallback(allFutures, new FutureCallback<List<Integer>>() {
  168. @Override
  169. public void onSuccess(@Nullable List<Integer> result) {
  170. for (Integer r : result)
  171. System.out.println("Output: " + r);
  172. }
  173. @Override
  174. public void onFailure(Throwable t) {
  175. System.out.println("Output error " + t);
  176. }
  177. });
  178. //Futures transform可以进一步重定义结果
  179. final ListenableFuture transform = Futures.transform(allFutures, new AsyncFunction<List<Integer>, Boolean>() {
  180. @Override
  181. public ListenableFuture apply(List<Integer> results) throws Exception {
  182. if (results.size() == 2)
  183. return Futures.immediateFuture(true);
  184. else
  185. return Futures.immediateFuture(false);
  186. }
  187. });
  188. Futures.addCallback(transform, new FutureCallback<Object>() {
  189. public void onSuccess(Object result) {
  190. System.out.println("success with: " + result);
  191. }
  192. public void onFailure(Throwable thrown) {
  193. System.out.printf("onFailure%s\n", thrown.getMessage());
  194. }
  195. });
  196. System.out.println(transform.get());
  197. // call future 1.
  198. // call future 2.
  199. // Output: 1
  200. // Output: 2
  201. // true
  202. // success with: true
  203. }
  204. }

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多