分享

Java 并行框架总结(Concurrent)

 CevenCheng 2011-10-18
 

Java 并行框架总结(Concurrent)

 1154人阅读 评论(0) 收藏 举报

1.        可重入锁ReentrantLock

之所以要引入可重入锁,而不用synchronized,是有原因的,重入就是一个特点,在同线程中,锁是可以重入的,而synchronized不行,这就提高了性能,后面将说明其他原因

可重入锁指在同一个线程中,可以重入的锁。当然,当这个线程获得锁后,其他线程将等待这个锁被释放后,才可以获得这个锁。

构造器:ReentrantLock(boolean fair): 布尔值用来表示,创建的这个锁是公平的锁,还是自由竞争的锁。所谓公平的锁,是指,各个希望获得所得线程获得锁的顺序是按到达的顺序获得,还是自由竞争获得。

2.        条件变量 Condition

synchronized的时候,锁定就是锁定一个对象,得到锁的就可以进到临界区,这是一个很宽泛的语义,而,引入了条件变量,你只有在满足某种条件才可以进入,这个条件是自己定义,可以有多个条件。

 

条件变量是线程同步对象中的一种,主要用来等待某种条件的发生,条件发生后,可以唤醒等待在该条件上的一个线程,或所有线程。条件变量要与锁一起协同工作。条件变量调用Lock.newCondition()获得一个实例:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

通常的调用方式如下:

// 当条件 con == true 时等待
lock.lock();

try {
    if (con == true) {
        condition.await();//
等待条件发生
    }

    // do something
} finally {
    lock.unlock();

}

// 条件变量的唤醒:
condition.signal(); //
唤醒等待的其中一个线程
condition.signalAll(); //
唤醒等待的所有线程

条件变量类似JDK1.4或以前版本中的 Object.wait(); Object.notify(); Object.notifyAll();

值得注意的是当condition.await()时,隐式的将条件变量关联的Lock解锁,而使其他线程有机会获得Lock,而检查条件,并在条件满足时,等待在条件变量上。

 

 

3.        读写锁

 

总的来说,是对一个资源的读写互斥,在操作系统中使用信号量实现的,ReentrantReadWriteLock 一个这个对象,可以看成是对一个资源的锁,而他的readLock方法和writeLock方法就对这个资源的读写锁。记住一点:

加了读锁,不可以加写锁,但是可以再加读锁

加了写锁,不可以加读锁,也不可以再加写锁

 

  1 package com.vinko.test.concurrent;
  2
 
  3 import java.util.Calendar;
  4
 import java.util.Map;
  5
 import java.util.TreeMap;
  6
 //import java.util.concurrent.locks.Condition;
  7
 import java.util.concurrent.locks.Lock;
  8
 import java.util.concurrent.locks.ReentrantReadWriteLock;
  9
 
 10 public class TestReadWriteLock {
 11
     
 12     private ReentrantReadWriteLock lock = null;
 13
     
 14     private Lock readLock = null;
 15
     private Lock writeLock = null;
 16
     
 17 //    private Condition condition = null;
 18
     
 19
     public int key = 100;
 20
     public int index = 100;
 21
     
 22     public Map<Integer, String> dataMap = null;
 23
     
 24     public TestReadWriteLock() {
 25
         lock = new ReentrantReadWriteLock(true);
 26
         
 27         readLock = lock.readLock();//获得读的锁
 28         writeLock = lock.writeLock();//获得对应的写的锁
 29         
 30 //        condition = writeLock.newCondition();
 31
         
 32
         dataMap = new TreeMap<Integer, String>();
 33
     }
 34     
 35     /**
 36
      * @param args
 37
      */
 38     public static void main(String[] args) {
 39
     
 40         TestReadWriteLock tester = new TestReadWriteLock();
 41
 
 42         // test lock downgrading
 43
         
 44 //        tester.readLock.lock();
 45 //        System.out.println(Thread.currentThread() + " get readLock");
 46 
 47         tester.writeLock.lock();
 48         System.out.println(Thread.currentThread() + " get writeLock.");
 49 
 50         tester.writeLock.lock();
 51         System.out.println(Thread.currentThread() + " get writeLock.");
 52 
 53         tester.readLock.lock();
 54         System.out.println(Thread.currentThread() + " get readLock");
 55 
 56         tester.readLock.lock();
 57         System.out.println(Thread.currentThread() + " get readLock");
 58         
 59 //        tester.writeLock.lock();
 60
 //        System.out.println(Thread.currentThread() + " get writeLock.");
 61 
 62         tester.readLock.unlock();
 63         tester.readLock.unlock();
 64         tester.writeLock.unlock();
 65         tester.writeLock.unlock();
 66         
 67         tester.test();
 68     }
 69     
 70     public void test() {
 71
 
 72         for (int i = 0; i < 10; i++) {
 73
             new Thread(new reader(this)).start();
 74
         }
 75         
 76         for (int i = 0; i <3; i++) {
 77
             new Thread(new writer(this)).start();
 78
         }
 79 
 80     }
 81 
 82     public void read() {
 83
 /*        
 84
         writeLock.lock();
 85         
 86         try {
 87             condition.await();
 88         } catch (InterruptedException e1) {
 89             e1.printStackTrace();
 90         }
 91         
 92         writeLock.unlock();
 93 */
 94         readLock.lock();
 95         
 96         try {
 97
             if (dataMap.isEmpty()) {
 98
                 Calendar now = Calendar.getInstance();
 99                System.out.println(now.getTime() + " R " + Thread.currentThread() + " get key, but map is empty.");
100             }
101             
102             String value = dataMap.get(index);
103 
104             Calendar now = Calendar.getInstance();
105            System.out.println(now.getTime() + " R " + Thread.currentThread() + " get key = " + index + " value = " + value + " map size = " + dataMap.size());
106             
107             // get next value
108
             if (value != null{
109
                 index ++;
110             }
111         } finally {
112
             readLock.unlock();
113         }
114 
115         try {
116
             Thread.sleep(3000);
117         } catch (InterruptedException e) {
118
             e.printStackTrace();
119         }
120     }
121     
122     public void write() {
123
         
124         writeLock.lock();
125         
126         
127         try {
128
             String value = "value" + key;
129             
130             dataMap.put(new Integer(key), value);
131
             
132             Calendar now = Calendar.getInstance();
133            System.out.println(now.getTime() + " W " + Thread.currentThread() + " put key = " + key + " value = " + value + " map size = " + dataMap.size());
134             
135             key ++;
136         
137 //            condition.signal();
138
             
139
             try {
140
                 Thread.sleep(500);
141             } catch (InterruptedException e) {
142
                 e.printStackTrace();
143             }
144 
145         } finally {
146
             writeLock.unlock();
147         }
148 
149     }
150 }
151 
152 class reader implements Runnable {
153
     
154     private TestReadWriteLock tester = null;
155
     
156     public reader(TestReadWriteLock tester) {
157
         this.tester = tester;
158
     }
159     
160     public void run() {
161
         Calendar now = Calendar.getInstance();
162         
163        System.out.println(now.getTime() + " R " + Thread.currentThread() + " started");
164         
165         while (true{
166
             tester.read();
167         }
168     }
169 }
170 
171 class writer implements Runnable {
172
     
173     private TestReadWriteLock tester = null;
174
     
175     public writer(TestReadWriteLock tester) {
176
         this.tester = tester;
177
     }

 

 

 

4.        信号量 Semaphore

信号量和C++中的信号量有些类似,在我的博客的另外一篇中,有实例代码,可以将信号量理解为资源的数目,线程可以通过Semaphore对象的acquire方法请求资源,如果有就立刻分给他,并且资源数减一,如果没有就阻塞等待;调用信号量的release就是释放资源,资源数加一,原先等待的线程就可以获得资源继续执行。

package com.vinko.test.concurrent;
  2
 
  3 import java.util.Calendar;
  4
 import java.util.concurrent.Semaphore;
  5
 import java.util.concurrent.locks.Condition;
  6
 import java.util.concurrent.locks.Lock;
  7
 import java.util.concurrent.locks.ReentrantLock;
  8
 
  9 public class TestSemaphore {
 10
 
 11     private ReentrantLock lock = null;
 12
     private Condition condition = null;
 13
     private Semaphore semaphore = null;
 14
     
 15     public TestSemaphore() {
 16
         lock = new ReentrantLock();
 17
         condition = lock.newCondition();
 18         semaphore = new Semaphore(2, true);
 19
     }
 20     
 21     /**
 22
      * @param args
 23
      */
 24     public static void main(String[] args) {
 25
 
 26         TestSemaphore tester = new TestSemaphore();
 27
         
 28         tester.test();
 29     }
 30     
 31     public Lock getLock() {
 32
         return lock;
 33
     }
 34     
 35     public Condition getCondition() {
 36
         return condition;
 37
     }
 38     
 39     public Semaphore getSemaphore() {
 40
         return semaphore;
 41
     }
 42     
 43     public void test() {
 44
         try {
 45
             /*
 46
             semaphore.acquire();
 47             System.out.println("get semaphore");
 48             
 49             semaphore.acquire();
 50             System.out.println("get semaphore");
 51 
 52             semaphore.release();
 53 
 54             semaphore.acquire();
 55             System.out.println("get semaphore");
 56             
 57             semaphore.acquire();
 58             System.out.println("get semaphore");
 59             */
 60             
 61             new Thread(new TestThread(this)).start();
 62
             new Thread(new TestThread(this)).start();
 63
             new Thread(new TestThread(this)).start();
 64
             new Thread(new TestThread(this)).start();
 65
             
 66             Thread.sleep(3000);
 67             
 68             lock.lock();
 69             condition.signal();
 70 //            condition.signal();
 71
 //            condition.signalAll();
 72             lock.unlock();
 73
             
 74         } catch (InterruptedException e) {
 75
             e.printStackTrace();
 76         }
 77     }
 78 
 79 }
 80 
 81 class TestThread implements Runnable {
 82
     
 83     private TestSemaphore tester = null;
 84
     
 85     public TestThread(TestSemaphore tester) {
 86
         this.tester = tester;
 87
     }
 88     
 89     public void run() {
 90
         
 91         Calendar now = Calendar.getInstance();
 92         
 93        System.out.println(now.getTime() + " " + Thread.currentThread() + " started.");
 94 
 95         while (true{
 96
             try {
 97
                 tester.getLock().lock();
 98                 tester.getCondition().await();
 99                 tester.getLock().unlock();
100                 
101                 Calendar now1 = Calendar.getInstance();
102                System.out.println(now1.getTime() + " " + Thread.currentThread() + " got signal.");
103                 
104                 tester.getSemaphore().acquire();
105                 
106                 Calendar now2 = Calendar.getInstance();
107                System.out.println(now2.getTime() + " " + Thread.currentThread() + " got semaphore.");
108                 
109 //                tester.getSemaphore().release();
110
                 
111
             } catch (InterruptedException e) {
112
                 e.printStackTrace();
113             }
114         }
115     }
116 }

5.        经典解说

 

 

一般的服务器都需要线程池,比如WebFTP等服务器,不过它们一般都自己实现了线程池,比如以前介绍过的TomcatResinJetty等,现在有了JDK5,我们就没有必要重复造车轮了,直接使用就可以,何况使用也很方便,性能也非常高。

package concurrent;
import 
java.util.concurrent.ExecutorService;
import 
java.util.concurrent.Executors;
public class 
TestThreadPool {
public static void 
main(String args[]) throws InterruptedException {
// only two threads
ExecutorService exec = Executors.newFixedThreadPool(2);
for
(int index = 0; index < 100; index++) {
Runnable run = new Runnable() {
public void 
run() {
long 
time = (long) (Math.random() * 1000);
System.out.println(“Sleeping ” + time + “ms”);
try 
{
Thread.sleep(time);
catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
// must shutdown
exec.shutdown();
}
}

上面是一个简单的例子,使用了2个大小的线程池来处理100个线程。但有一个问题:在for循环的过程中,会等待线程池有空闲的线程,所以主线程会阻塞的。为了解决这个问题,一般启动一个线程来做for循环,就是为了避免由于线程池满了造成主线程阻塞。不过在这里我没有这样处理。[重要修正:经过测试,即使线程池大小小于实际线程数大小,线程池也不会阻塞的,这与Tomcat的线程池不同,它将Runnable实例放到一个无限BlockingQueue中,所以就不用一个线程启动for循环,Doug Lea果然厉害]

另外它使用了Executors的静态函数生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即使它是Idle。这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像Tomcat的线程池一样设置最大线程数最小线程数空闲线程keepAlive的时间。通过这些可以基本上替换Tomcat的线程池实现方案。

需要注意的是线程池必须使用shutdown来显式关闭,否则主线程就无法退出。shutdown也不会阻塞主线程。

多长时间运行的应用有时候需要定时运行任务完成一些诸如统计、优化等工作,比如在电信行业中处理用户话单时,需要每隔1分钟处理话单;网站每天凌晨统计用户访问量、用户数;大型超时凌晨3点统计当天销售额、以及最热卖的商品;每周日进行数据库备份;公司每个月的10号计算工资并进行转帐等,这些都是定时任务。通过 java的并发库concurrent可以轻松的完成这些任务,而且非常的简单。

package concurrent;
import static 
java.util.concurrent.TimeUnit.SECONDS;
import 
java.util.Date;
import 
java.util.concurrent.Executors;
import 
java.util.concurrent.ScheduledExecutorService;
import 
java.util.concurrent.ScheduledFuture;
public class 
TestScheduledThread {
public static void 
main(String[] args) {
final 
ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(2);
final 
Runnable beeper = new Runnable() {
int 
count = 0;
public void 
run() {
System.out.println(new Date() + ” beep ” + (++count));
}
};
// 1秒钟后运行,并每隔2秒运行一次
final 
ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(
beeper, 12, SECONDS);
// 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行
final 
ScheduledFuture beeperHandle2 = scheduler
.scheduleWithFixedDelay(beeper, 25, SECONDS);
// 30秒后结束关闭任务,并且关闭Scheduler
scheduler.schedule(new Runnable() {
public void 
run() {
beeperHandle.cancel(true);
beeperHandle2.cancel(true);
scheduler.shutdown();
}
}, 30, SECONDS);
}
}

为了退出进程,上面的代码中加入了关闭Scheduler的操作。而对于24小时运行的应用而言,是没有必要关闭Scheduler的。

实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等待其他线程都完成某一阶段后再执行,等所有线程都到达某一个阶段后再统一执行。

比如有几个旅行团需要途经深圳、广州、韶关、长沙最后到达武汉。旅行团中有自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要等待其他旅行团到达此地后再同时出发,直到都到达终点站武汉。

这时候CyclicBarrier就可以派上用场。CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。

package concurrent;
import 
java.text.SimpleDateFormat;
import 
java.util.Date;
import 
java.util.concurrent.BrokenBarrierException;
import 
java.util.concurrent.CyclicBarrier;
import 
java.util.concurrent.ExecutorService;
import 
java.util.concurrent.Executors;
public class 
TestCyclicBarrier {
// 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
private static int
[] timeWalk = { 58151510 };
// 自驾游
private static int
[] timeSelf = { 1344};
// 旅游大巴
private static int
[] timeBus = { 2466};

static 
String now() {
SimpleDateFormat sdf = new SimpleDateFormat(“HH:mm:ss”);
return 
sdf.format(new Date()) + “: “;
}

static class Tour implements Runnable {
private int[] times;
private CyclicBarrier barrier;
private 
String tourName;
public Tour(CyclicBarrier barrier, String tourName, int
[] times) {
this
.times = times;
this
.tourName = tourName;
this
.barrier = barrier;
}
public void 
run() {
try 
{
Thread.sleep(times[0] * 1000);
System.out.println(now() + tourName + ” Reached Shenzhen”);
barrier.await();
Thread.sleep(times[1] * 1000);
System.out.println(now() + tourName + ” Reached Guangzhou”);
barrier.await();
Thread.sleep(times[2] * 1000);
System.out.println(now() + tourName + ” Reached Shaoguan”);
barrier.await();
Thread.sleep(times[3] * 1000);
System.out.println(now() + tourName + ” Reached Changsha”);
barrier.await();
Thread.sleep(times[4] * 1000);
System.out.println(now() + tourName + ” Reached Wuhan”);
barrier.await();
catch 
(InterruptedException e) {
catch 
(BrokenBarrierException e) {
}
}
}

public static void main(String[] args) {
// 
三个旅行团
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
exec.submit(new 
Tour(barrier, “WalkTour”, timeWalk));
exec.submit(new 
Tour(barrier, “SelfTour”, timeSelf));
exec.submit(new 
Tour(barrier, “BusTour”, timeBus));
exec.shutdown();
}
}

运行结果:
00:02:25: SelfTour Reached Shenzhen
00:02:25: BusTour Reached Shenzhen
00:02:27: WalkTour Reached Shenzhen
00:02:30: SelfTour Reached Guangzhou
00:02:31: BusTour Reached Guangzhou
00:02:35: WalkTour Reached Guangzhou
00:02:39: SelfTour Reached Shaoguan
00:02:41: BusTour Reached Shaoguan

发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的puttake操作会阻塞,为了使线程退出,特在队列中添加了一个标识,算法中也叫哨兵,当发现这个哨兵后,写线程就退出。

当然线程池也要显式退出了。

package concurrent;
import java.io.File;
import java.io.FileFilter;
import 
java.util.concurrent.BlockingQueue;
import 
java.util.concurrent.ExecutorService;
import 
java.util.concurrent.Executors;
import 
java.util.concurrent.LinkedBlockingQueue;
import 
java.util.concurrent.atomic.AtomicInteger;

public class TestBlockingQueue {
static long randomTime() {
return (long
) (Math.random() * 1000);
}

public static void main(String[] args) {
// 
能容纳100个文件
final BlockingQueue queue = new LinkedBlockingQueue(100);
// 
线程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new 
File(“F://JavaLib”);
// 
完成标志
final File exitFile = new File(“”);
// 
读个数
final AtomicInteger rc = new AtomicInteger();
// 
写个数
final AtomicInteger wc = new AtomicInteger();
// 
读线程
Runnable read = new Runnable() {
public void 
run() {
scanFile(root);
scanFile(exitFile);
}

public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean 
accept(File pathname) {
return 
pathname.isDirectory()
|| pathname.getPath().endsWith(“.java”);
}
});
for 
(File one : files)
scanFile(one);
else 
{
try 
{
int 
index = rc.incrementAndGet();
System.out.println(“Read0: ” + index + ” “
+ file.getPath());
queue.put(file);
catch 
(InterruptedException e) {
}
}
}
};
exec.submit(read);
// 
四个写线程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new 
Runnable() {
String threadName = “Write” + NO;
public void 
run() {
while (true
) {
try 
{
Thread.sleep(randomTime());
int 
index = wc.incrementAndGet();
File file = queue.take();
// 
队列已经无对象
if (file == exitFile) {
// 
再次添加标志,以让其他线程正常退出
queue.put(exitFile);
break
;
}
System.out.println(threadName + “: ” + index + ” “
+ file.getPath());
catch 
(InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}

名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。

CountDownLatch最重要的方法是countDown()await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier

下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。

同样,线程池需要显式shutdown

package concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 
开始的倒数锁
final CountDownLatch begin = new CountDownLatch(1);
// 
结束的倒数锁
final CountDownLatch end = new CountDownLatch(10);
// 
十名选手
final ExecutorService exec = Executors.newFixedThreadPool(10);
for(int 
index = 0; index < 10; index++) {
final int 
NO = index + 1;
Runnable run = new 
Runnable(){
public void 
run() {
try 
{
begin.await();
Thread.sleep((long
) (Math.random() * 10000));
System.out.println(“No.” + NO + ” arrived”);
catch 
(InterruptedException e) {
finally 
{
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println(“Game Start”);
begin.countDown();
end.await();
System.out.println(“Game Over”);
exec.shutdown();
}
}

运行结果:
Game Start
No.4 arrived
No.1 arrived
No.7 arrived
No.9 arrived
No.3 arrived
No.2 arrived
No.8 arrived
No.10 arrived
No.6 arrived
No.5 arrived
Game Over

时候在实际应用中,某些操作很耗时,但又不是不可或缺的步骤。比如用网页浏览器浏览新闻时,最重要的是要显示文字内容,至于与新闻相匹配的图片就没有那么重要的,所以此时首先保证文字信息先显示,而图片信息会后显示,但又不能不显示,由于下载图片是一个耗时的操作,所以必须一开始就得下载。

Java并发库Future类就可以满足这个要求。Future的重要方法包括get()cancel()get()获取数据对象,如果数据没有加载,就会阻塞直到取到数据,而 cancel()是取消数据加载。另外一个get(timeout)操作,表示如果在timeout时间内没有取到就失败返回,而不再阻塞。

下面的Demo简单的说明了Future的使用方法:一个非常耗时的操作必须一开始启动,但又不能一直等待;其他重要的事情又必须做,等完成后,就可以做不重要的事情。

package concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import 
java.util.concurrent.Executors;
import 
java.util.concurrent.Future;

public class TestFutureTask {
public static void main(String[] args)throws InterruptedException,
ExecutionException {
final ExecutorService exec = Executors.newFixedThreadPool(5);
Callable call = new 
Callable() {
public String call() throws 
Exception {
Thread.sleep(1000 5);
return 
“Other less important but longtime things.”;
}
};
Future task = exec.submit(call);
// 
重要的事情
Thread.sleep(1000 3);
System.out.println(“Let’s do important things.”);
// 
其他不重要的事情
String obj = task.get();
System.out.println(obj);
// 
关闭线程池
exec.shutdown();
}
}

运行结果:
Let’s do important things.
Other less important but longtime things.

虑以下场景:浏览网页时,浏览器了5个线程下载网页中的图片文件,由于图片大小、网站访问速度等诸多因素的影响,完成图片下载的时间就会有很大的不同。如果先下载完成的图片就会被先显示到界面上,反之,后下载的图片就后显示。

Java并发库CompletionService可以满足这种场景要求。该接口有两个重要方法:submit()take()submit用于提交一个runnable或者callable,一般会提交给一个线程池处理;而take就是取出已经执行完毕runnable或者callable实例的Future对象,如果没有满足要求的,就等待了。 CompletionService还有一个对应的方法poll,该方法与take类似,只是不会等待,如果没有满足要求,就返回null对象。

package concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import 
java.util.concurrent.ExecutorCompletionService;
import 
java.util.concurrent.ExecutorService;
import 
java.util.concurrent.Executors;
import 
java.util.concurrent.Future;

public class TestCompletionService {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ExecutorService exec = Executors.newFixedThreadPool(10);
CompletionService serv = 
new ExecutorCompletionService(exec);

for (int index = 0; index < 5; index++) {
final int NO = index;
Callable downImg = new Callable() {
public String call() throws 
Exception {
Thread.sleep((long
) (Math.random() * 10000));
return 
“Downloaded Image ” + NO;
}
};
serv.submit(downImg);
}

Thread.sleep(1000 2);
System.out.println(“Show web content”);
for (int 
index = 0; index < 5; index++) {
Future task = serv.take();
String img = task.get();
System.out.println(img);
}
System.out.println(“End”);
// 
关闭线程池
exec.shutdown();
}
}

运行结果:
Show web content
Downloaded Image 1
Downloaded Image 2
Downloaded Image 4
Downloaded Image 0
Downloaded Image 3
End

操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java并发库Semaphore可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,acquire()获取一个许可,如果没有就等待,而release()释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。

Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存无限的节点,用Semaphore可以实现有限大小的链表。另外重入锁ReentrantLock也可以实现该功能,但实现上要负责些,代码也要复杂些。

下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()release()获取和释放访问许可。

package concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class TestSemaphore {
public static void main(String[] args) {
// 
线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 
只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 
模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void 
run() {
try 
{
// 
获取许可
semp.acquire();
System.out.println(“Accessing: ” + NO);
Thread.sleep((long
) (Math.random() * 10000));
// 
访问完后,释放
semp.release();
catch 
(InterruptedException e) {
}
}
};
exec.execute(run);
}
// 
退出线程池
exec.shutdown();
}
}

运行结果:
Accessing: 0
Accessing: 1
Accessing: 2
Accessing: 3
Accessing: 4
Accessing: 5
Accessing: 6
Accessing: 7
Accessing: 8
Accessing: 9
Accessing: 10
Accessing: 11
Accessing: 12
Accessing: 13
Accessing: 14
Accessing: 15
Accessing: 16
Accessing: 17
Accessing: 18
Accessing: 19

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多