分享

java线程池主线程等待子线程执行完成

 dongsibei 2014-04-21
public  class  Threads {

      static  ExecutorService  executorService  =  Executors . newFixedThreadPool ( 1 ) ;
     
      @SuppressWarnings ( “rawtypes” )
      public  static  void  main (String[]  args )  throws  InterruptedException ,  ExecutionException {
          SubThread thread  =  new  SubThread () ;
//        thread.start();
           Future  future  =  executorService . submit (thread) ;
           mainThreadOtherWork () ;
          System . out . println ( “now waiting sub thread done.” ) ;
          future . get () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System . out . println ( “now all done.” ) ;
           executorService . shutdown () ;
     }

      private  static  void  mainThreadOtherWork () {
          System . out . println ( “main thread work start” ) ;
           try  {
              Thread . sleep ( 3000L ) ;
          }  catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System . out . println ( “main thread work done.” ) ;
     }

      public  static  class  SubThread  extends  Thread{
           @Override
           public  void  run () {
               working () ;
          }

           private  void  working () {
              System . out . println ( “sub thread start working.” ) ;
               busy () ;
              System . out . println ( “sub thread stop working.” ) ;
          }

           private  void  busy () {
               try  {
                    sleep ( 5000L ) ;
              }  catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
     
}

这 里, ThreadPoolExecutor 是实现了 ExecutorService的方法, sumbit的过程就是把一个Runnable接口对象包装成一个 Callable接口对象, 然后放到 workQueue里等待调度执行. 当然, 执行的启动也是调用了thread的start来做到的, 只不过这里被包装掉了. 另外, 这里的thread是会被重复利用的, 所以这里要退出主线程, 需要执行以下shutdown方法以示退出使用线程池. 扯远了. 

这 种方法是得益于Callable接口和Future模式, 调用future接口的get方法, 会同步等待该future执行结束, 然后获取到结果. Callbale接口的接口方法是 V call(); 是可以有返回结果的, 而Runnable的 void run(), 是没有返回结果的. 所以, 这里即使被包装成Callbale接口, future.get返回的结果也是null的.如果需要得到返回结果, 建议使用Callable接口.

通过队列来控制线程的进度, 是很好的一个理念. 我们完全可以自己搞个队列, 自己控制. 这样也可以实现. 不信看代码:

public  class  Threads {

//   static ExecutorService executorService = Executors.newFixedThreadPool(1);
      static  final  BlockingQueue < Integer >  queue  =  new  ArrayBlockingQueue < Integer > ( 1 ) ;
      public  static  void  main (String[]  args )  throws  InterruptedException ,  ExecutionException {
          SubThread thread  =  new  SubThread ( queue ;
          thread . start () ;
//        Future future = executorService.submit(thread);
           mainThreadOtherWork () ;
          System . out . println ( “now waiting sub thread done.” ) ;
//        future.get();
           queue . take () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System . out . println ( “now all done.” ) ;
//        executorService.shutdown();
     }

      private  static  void  mainThreadOtherWork () {
          System . out . println ( “main thread work start” ) ;
           try  {
              Thread . sleep 3000L ) ;
          }  catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System . out . println ( “main thread work done.” ) ;
     }

      public  static  class  SubThread  extends  Thread{
          
           private  BlockingQueue < Integer >  queue ;
          
           /**
           *  @param  queue
           */
           public  SubThread ( BlockingQueue < Integer >  queue ) {
               this . queue  =  queue ;
          }

           @Override
           public  void  run () {
               try {
               working () ;
              } finally {
                    try  {
                         queue . put ( 1 ) ;
                   }  catch  (InterruptedException e) {
                        e . printStackTrace () ;
                   }
              }
              
          }

           private  void  working () {
              System . out . println ( “sub thread start working.” ) ;
               busy () ;
              System . out . println ( “sub thread stop working.” ) ;
          }

           private  void  busy () {
               try  {
                    sleep 5000L ) ;
              }  catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
     
}

这 里是得益于我们用了一个阻塞队列, 他的put操作和take操作都会阻塞(同步), 在满足条件的情况下.当我们调用take()方法是, 由于子线程还没结束, 队列是空的, 所以这里的take操作会阻塞, 直到子线程结束的时候, 往队列里面put了个元素, 表明自己结束了. 这时候主线程的take()就会返回他拿到的数据. 当然, 他拿到什么我们是不必去关心的.
以上几种情况都是针对子线程只有1个的时候. 当子线程有多个的时候, 情况就不妙了.
第一种方法, 你要调用很多个线程的join, 特别是当你的线程不是for循环创建的, 而是一个一个创建的时候.
第二种方法, 要调用很多的future的get方法, 同第一种方法.
第三种方法, 比较方便一些, 只需要每个线程都在queue里面 put一个元素就好了.但是, 第三种方法, 这个队列里的对象, 对我们是毫无用处, 我们为了使用队列, 而要不明不白浪费一些内存, 那有没有更好的办法呢?
有的, concurrency包里面提供了好多有用的东东, 其中, CountDownLanch就是我们要用的.
CountDownLanch 是一个倒数计数器, 给一个初始值(>=0), 然后没countDown一次就会减1, 这很符合等待多个子线程结束的产景: 一个线程结束的时候, countDown一次, 直到所有都countDown了 , 那么所有子线程就都结束了.
先看看CountDownLanch有哪些方法:
await: 会阻塞等待计数器减少到0位置. 带参数的await是多了等待时间.
countDown: 将当前的技术减1
getCount(): 返回当前的计数
显而易见, 我们只需要在子线程执行之前, 赋予初始化countDownLanch, 并赋予线程数量为初始值.
每个线程执行完毕的时候, 就countDown一下.主线程只需要调用await方法, 可以等待所有子线程执行结束, 看代码:

public  class  Threads {

//   static ExecutorService executorService = Executors.newFixedThreadPool(1);
      static  final  BlockingQueue < Integer >  queue  =  new  ArrayBlockingQueue < Integer > ( 1 ) ;
      public  static  void  main (String[]  args )  throws  InterruptedException ,  ExecutionException {
           int  threads  =  5 ;
          CountDownLatch countDownLatch  =  new  CountDownLatch (threads) ;
           for int  i = 0 ; i < threads ; i ++ ){
              SubThread thread  =  new  SubThread ( 2000 * (i + 1 ) ,  countDownLatch) ;
              thread . start () ;
          }
//        Future future = executorService.submit(thread);
           mainThreadOtherWork () ;
          System . out . println ( “now waiting sub thread done.” ) ;
//        future.get();
//        queue.take();
          countDownLatch . await () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System . out . println ( “now all done.” ) ;
//        executorService.shutdown();
     }

      private  static  void  mainThreadOtherWork () {
          System . out . println ( “main thread work start” ) ;
           try  {
              Thread . sleep 3000L ) ;
          }  catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System . out . println ( “main thread work done.” ) ;
     }

      public  static  class  SubThread  extends  Thread{
          
//        private BlockingQueue<Integer> queue;
           private  CountDownLatch  countDownLatch ;
           private  long  work ;
          
           /**
           *  @param  queue
           */
//        public SubThread(BlockingQueue<Integer> queue) {
//            this.queue = queue;
//            this.work = 5000L;
//        }
          
           public  SubThread ( long  work ,  CountDownLatch  countDownLatch ) {
//            this.queue = queue;
               this . countDownLatch  =  countDownLatch ;
               this . work  =  work ;
          }

           @Override
           public  void  run () {
               try {
               working () ;
              } finally {
//                 try {
//                      queue.put(1);
//                 } catch (InterruptedException e) {
//                      e.printStackTrace();
//                 }
                    countDownLatch . countDown () ;
              }
              
          }

           private  void  working () {
              System . out . println ( getName () + ” sub thread start working.” ) ;
               busy () ;
              System . out . println ( getName () + ” sub thread stop working.” ) ;
          }

           private  void  busy () {
               try  {
                    sleep work ) ;
              }  catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多