分享

JAVA5 并发库的使用

 醉三郎 2012-06-11

一、线程池

   1: public static void main(String[] args) {
   2:     // 产生线程池,有3个线程,使用固定线程池创建
   3:     //ExecutorService threadPool = Executors.newFixedThreadPool(3);
   4:     //产生线程池,动态创建线程池的大小
   5:     ExecutorService threadPool = Executors.newCachedThreadPool();
   6:     //向线程池添加10个任务
   7:     for (int i=1; i<10; i++) {
   8:         final int task = i;
   9:         threadPool.execute(new Runnable() {
  10:             @Override
  11:             public void run() {
  12:                 for (int j=0; j<10; j++) {
  13:                     try {
  14:                         Thread.sleep(20);
  15:                     } catch (InterruptedException e) {
  16:                         // TODO Auto-generated catch block
  17:                         e.printStackTrace();
  18:                     }
  19:                     System.out.println(
  20:                             Thread.currentThread().getName() +
  21:                             " is  looping of " + j +
  22:                             " for task " + task);
  23:                 }
  24:                 
  25:             }
  26:         });
  27:     }
  28:     
  29:     //线程调度的使用
  30:     //该功能为定时运行6秒以后运行,然后每隔2秒运行一次
  31:     Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
  32:             new Runnable() {
  33:                 @Override
  34:                 public void run() {
  35:                     // TODO Auto-generated method stub
  36:                     System.out.println("bomb....");
  37:                 }
  38:             }, 
  39:             6,
  40:             2,
  41:             TimeUnit.SECONDS);
  42: }

二、并发锁(lock)

Lock比传统线程模型中的synchronized方式更加面向对象。

   1: Lock lock = new ReentrantLock();
   2: lock.lock();
   3: try {
   4: ........
   5: } finally {
   6:   lock.unlock();
   7: }
读写锁:读锁不互斥,读锁与写锁互斥,写锁与写锁互斥
   1: //从JAVA文档中找到的,如果数据没有读到,就解开读锁,加上写锁
   2: class CachedData {
   3:    Object data;
   4:    volatile boolean cacheValid;
   5:    ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   6:  
   7:    void processCachedData() {
   8:      rwl.readLock().lock();
   9:      if (!cacheValid) {
  10:         // Must release read lock before acquiring write lock
  11:         rwl.readLock().unlock();
  12:         rwl.writeLock().lock();
  13:         // Recheck state because another thread might have acquired
  14:         //   write lock and changed state before we did.
  15:         if (!cacheValid) {
  16:           data = ...
  17:           cacheValid = true;
  18:         }
  19:         // Downgrade by acquiring read lock before releasing write lock
  20:         //我的理解,写上读锁,下面的写锁降级成更新锁
  21:         rwl.readLock().lock();
  22:         rwl.writeLock().unlock(); // Unlock write, still hold read
  23:      }
  24:  
  25:      use(data);
  26:      rwl.readLock().unlock();
  27:    }
  28:  }

 

三、锁的条件(Condition)

Condition的功能就是在传统线程技术中的wait和notify的功能。

   1: /**
   2:  * 每个方法各执行分10次和5次运行
   3:  *
   4:  */
   5: public class ConditionTest {
   6:  
   7:     public static void main(String[] args) {
   8:         ExecutorService service = Executors.newSingleThreadExecutor();
   9:         final Business2 business = new Business2();
  10:         service.execute(new Runnable(){
  11:  
  12:             public void run() {
  13:                 for(int i=0;i<50;i++){
  14:                     business.sub();
  15:                 }
  16:             }
  17:             
  18:         });
  19:         
  20:         for(int i=0;i<50;i++){
  21:             business.main();
  22:         }
  23:     }
  24:  
  25: }
  26:  
  27: class Business2{
  28:     Lock lock = new ReentrantLock();
  29:     //为了达到线程间通信
  30:     Condition condition = lock.newCondition();
  31:     boolean bShouldSub = true;
  32:     public void sub(){
  33:         lock.lock();
  34:         if(!bShouldSub)
  35:             try {
  36:                 condition.await();//线程等待
  37:             } catch (InterruptedException e) {
  38:                 // TODO Auto-generated catch block
  39:                 e.printStackTrace();
  40:             }
  41:         try
  42:         {
  43:             for(int i=0;i<10;i++){
  44:                 System.out.println(Thread.currentThread().getName() + " : " + i);
  45:             }
  46:             bShouldSub = false;
  47:             //通知其他的线程
  48:             condition.signal();
  49:         }finally{
  50:             lock.unlock();
  51:         }
  52:     }
  53:     
  54:     public void main(){
  55:         lock.lock();
  56:         if(bShouldSub)
  57:             try {
  58:                 condition.await();
  59:             } catch (InterruptedException e) {
  60:                 e.printStackTrace();
  61:             }        
  62:         try
  63:         {
  64:             for(int i=0;i<5;i++){
  65:                 System.out.println(Thread.currentThread().getName() + " : " + i);
  66:             }
  67:             bShouldSub = true;
  68:             condition.signal();            
  69:         }finally{
  70:             lock.unlock();
  71:         }        
  72:     }
  73: }

另外,再来一个JDK中的例子,更加经典

   1: //从JDK中找到的例子,还是JDK得例子经典
   2: //本例子是一个可阻塞的队列
   3: class BoundedBuffer {
   4:    final Lock lock = new ReentrantLock();
   5:    //此处用到2个Condition,是为了区别取和放操作
   6:    final Condition notFull  = lock.newCondition(); 
   7:    final Condition notEmpty = lock.newCondition(); 
   8:    //队列为100
   9:    final Object[] items = new Object[100];
  10:    int putptr, takeptr, count;
  11:  
  12:    public void put(Object x) throws InterruptedException {
  13:      lock.lock();
  14:      try {
  15:        while (count == items.length) 
  16:          notFull.await();//队列满,等待
  17:        items[putptr] = x; 
  18:        if (++putptr == items.length) 
  19:       putptr = 0;//指针越界
  20:        ++count;
  21:        notEmpty.signal();
  22:      } finally {
  23:        lock.unlock();
  24:      }
  25:    }
  26:  
  27:    public Object take() throws InterruptedException {
  28:      lock.lock();
  29:      try {
  30:        while (count == 0) 
  31:          notEmpty.await();//队列空
  32:        Object x = items[takeptr]; 
  33:        if (++takeptr == items.length) takeptr = 0;
  34:        --count;
  35:        notFull.signal();
  36:        return x;
  37:      } finally {
  38:        lock.unlock();
  39:      }
  40:    } 
  41:  }

 

四、信号灯(Semaphore)

维护当前访问自身的线程个数,并提供同步机制。

   1: public class SemaphoreTest {
   2:     public static void main(String[] args) {
   3:         ExecutorService service = Executors.newCachedThreadPool();
   4:         final  Semaphore sp = new Semaphore(3);//只允许3个线程的并发
   5:         for(int i=0;i<10;i++){
   6:             Runnable runnable = new Runnable(){
   7:                     public void run(){
   8:                     try {
   9:                         sp.acquire();//是否可以让当前线程执行
  10:                     } catch (InterruptedException e1) {
  11:                         e1.printStackTrace();
  12:                     }
  13:                     System.out.println("线程" + Thread.currentThread().getName() + 
  14:                             "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
  15:                     try {
  16:                         Thread.sleep((long)(Math.random()*10000));
  17:                     } catch (InterruptedException e) {
  18:                         e.printStackTrace();
  19:                     }
  20:                     System.out.println("线程" + Thread.currentThread().getName() + 
  21:                             "即将离开");                    
  22:                     sp.release();//释放信号量
  23:                     //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
  24:                     System.out.println("线程" + Thread.currentThread().getName() + 
  25:                             "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");                    
  26:                 }
  27:             };
  28:             service.execute(runnable);            
  29:         }
  30:     }
  31:  
  32: }

五、其他同步工具
CyclicBarrier:需要有多个线程同时到达才向下执行

   1:  
   2: public class CyclicBarrierTest {
   3:         //模拟旅游集合的情况
   4:     public static void main(String[] args) {
   5:         ExecutorService service = Executors.newCachedThreadPool();
   6:         final  CyclicBarrier cb = new CyclicBarrier(3);
   7:         for(int i=0;i<3;i++){
   8:             Runnable runnable = new Runnable(){
   9:                     public void run(){
  10:                     try {
  11:                         Thread.sleep((long)(Math.random()*10000));    
  12:                         System.out.println("线程" + Thread.currentThread().getName() + 
  13:                                 "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
  14:                         cb.await();//只有3个线程都到此处,程序才会往下走
  15:                         
  16:                         Thread.sleep((long)(Math.random()*10000));    
  17:                         System.out.println("线程" + Thread.currentThread().getName() + 
  18:                                 "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
  19:                         cb.await();    
  20:                         Thread.sleep((long)(Math.random()*10000));    
  21:                         System.out.println("线程" + Thread.currentThread().getName() + 
  22:                                 "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
  23:                         cb.await();                        
  24:                     } catch (Exception e) {
  25:                         e.printStackTrace();
  26:                     }                
  27:                 }
  28:             };
  29:             service.execute(runnable);
  30:             
  31:         }
  32:         service.shutdown();
  33:     }
  34:     
  35: }


CountDownLatch:犹如倒计数器,等计数器减少到0,程序向下执行

   1: public class CountdownLatchTest {
   2:    //模拟赛跑的情况
   3:     public static void main(String[] args) {
   4:         ExecutorService service = Executors.newCachedThreadPool();
   5:          //计数器
   6:         final CountDownLatch cdOrder = new CountDownLatch(1);
   7:         final CountDownLatch cdAnswer = new CountDownLatch(3);        
   8:         for(int i=0;i<3;i++){
   9:             Runnable runnable = new Runnable(){
  10:                     public void run(){
  11:                     try {
  12:                         System.out.println("线程" + Thread.currentThread().getName() + 
  13:                                 "正准备接受命令");                        
  14:                         cdOrder.await();
  15:                         System.out.println("线程" + Thread.currentThread().getName() + 
  16:                         "已接受命令");                                
  17:                         Thread.sleep((long)(Math.random()*10000));    
  18:                         System.out.println("线程" + Thread.currentThread().getName() + 
  19:                                 "回应命令处理结果");                        
  20:                         cdAnswer.countDown();                        
  21:                     } catch (Exception e) {
  22:                         e.printStackTrace();
  23:                     }                
  24:                 }
  25:             };
  26:             service.execute(runnable);
  27:         }    
  28:         //主线程    
  29:         try {
  30:             Thread.sleep((long)(Math.random()*10000));
  31:         
  32:             System.out.println("线程" + Thread.currentThread().getName() + 
  33:                     "即将发布命令");
  34:             //计数器减1                        
  35:             cdOrder.countDown();
  36:             System.out.println("线程" + Thread.currentThread().getName() + 
  37:             "已发送命令,正在等待结果");    
  38:             cdAnswer.await();//等待计数器为0,然后主线程往下走
  39:             System.out.println("线程" + Thread.currentThread().getName() + 
  40:             "已收到所有响应结果");    
  41:         } catch (Exception e) {
  42:             e.printStackTrace();
  43:         }                
  44:         service.shutdown();
  45:  
  46:     }
  47: }

 

Exchanger:实现线程间的数据交换

   1: public class ExchangerTest {
   2:  
   3:     public static void main(String[] args) {
   4:         ExecutorService service = Executors.newCachedThreadPool();
   5:         final Exchanger exchanger = new Exchanger();
   6:         service.execute(new Runnable(){
   7:             public void run() {
   8:                 try {                
   9:                     Thread.sleep((long)(Math.random()*10000));
  10:                     String data1 = "zxx";
  11:                     System.out.println("线程" + Thread.currentThread().getName() + 
  12:                     "正在把数据" + data1 +"换出去");
  13:                     String data2 = (String)exchanger.exchange(data1);//交换数据
  14:                     System.out.println("线程" + Thread.currentThread().getName() + 
  15:                     "换回的数据为" + data2);
  16:                 }catch(Exception e){
  17:                     
  18:                 }
  19:             }    
  20:         });
  21:         service.execute(new Runnable(){
  22:             public void run() {
  23:                 try {                
  24:                     Thread.sleep((long)(Math.random()*10000));
  25:                     String data1 = "lhm";
  26:                     System.out.println("线程" + Thread.currentThread().getName() + 
  27:                     "正在把数据" + data1 +"换出去");
  28:                     String data2 = (String)exchanger.exchange(data1);
  29:                     System.out.println("线程" + Thread.currentThread().getName() + 
  30:                     "换回的数据为" + data2);
  31:                 }catch(Exception e){
  32:                     
  33:                 }                
  34:             }    
  35:         });        
  36:     }
  37:  
  38: }

 

六、可阻塞的队列

 

   1: public class BlockingQueueCondition {
   2:     //现在2个线程的交替操作
   3:     public static void main(String[] args) {
   4:         ExecutorService service = Executors.newSingleThreadExecutor();
   5:         final Business3 business = new Business3();
   6:         service.execute(new Runnable(){
   7:  
   8:             public void run() {
   9:                 for(int i=0;i<50;i++){
  10:                     business.sub();
  11:                 }
  12:             }
  13:             
  14:         });
  15:         
  16:         for(int i=0;i<50;i++){
  17:             business.main();
  18:         }
  19:     }
  20:  
  21: }
  22:  
  23: class Business3{
  24:     BlockingQueue subQueue = new ArrayBlockingQueue(1);
  25:     BlockingQueue mainQueue = new ArrayBlockingQueue(1);
  26:     //匿名构造方法,相当于一个构造方法
  27:         {
  28:         try {
  29:             mainQueue.put(1);//让主队列满,不能put操作
  30:         } catch (InterruptedException e) {
  31:             e.printStackTrace();
  32:         }
  33:     }
  34:     public void sub(){
  35:         try
  36:         {
  37:             mainQueue.take();
  38:             for(int i=0;i<10;i++){
  39:                 System.out.println(Thread.currentThread().getName() + " : " + i);
  40:             }
  41:             subQueue.put(1);
  42:         }catch(Exception e){
  43:  
  44:         }
  45:     }
  46:     
  47:     public void main(){
  48:         
  49:         try
  50:         {
  51:             subQueue.take();
  52:             for(int i=0;i<5;i++){
  53:                 System.out.println(Thread.currentThread().getName() + " : " + i);
  54:             }
  55:             mainQueue.put(1);
  56:         }catch(Exception e){
  57:         }        
  58:     }
  59: }

 

七、同步集合类

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多