分享

Synchronizer 同步工具

 碧海山城 2010-10-27

Synchronizer

是一种满足一种规则的类的统称。它有以下特性:

· 它是一个对象

· 封装状态,而这些状态决定着线程执行到某一点是通过还是被迫等待

· 提供操作状态的方法

其实BlockingQueue就是一种SynchronizerJava还提供了其他几种Synchronizer

1.1 CountDownLatch(闭锁)

CountDownLatch是一种闭锁,它通过内部一个计数器count来标示状态。一个线程调用await()方法之后,count>0时,所有调用其await方法的线程都需等待,当通过其countDown方法将count降为0时所有等待的线程将会被唤起。使用实例如下所示:

public class TestHarness {     

public long timeTasks(int nThreads, final Runnable task)             

throws InterruptedException {         

//开始的"",计数是1

final CountDownLatch startGate = new CountDownLatch(1);         

//结束的"",计数是工作的线程数

final CountDownLatch endGate = new CountDownLatch(nThreads);

       

 for (int i = 0; i < nThreads; i++) {             

Thread t = new Thread() {                 

public void run() {                     

try {                         

//使得每个线程都处于等待状态,确保他们已经处于等待状态

startGate.await();                         

try {  

   task.run();                         

}finally {

//每个线程都为结束线程减1

   endGate.countDown();      

}              

} catch (InterruptedException ignored) { }                 

}             

};             

t.start();         

}         

long start = System.nanoTime();         

//开始运行,每个线程都已经准备好了

startGate.countDown();

//直到每个线程都与你选哪个完毕,获得执行时间         

endGate.await();         

long end = System.nanoTime();         

return end-start;     

}

}  

闭锁主要是主线程等待子线程,等待的是某个事件,子线程在countDown之后就可以结束,将数量减1,所有子线程结束之后,到0,那么主线程的等待也结束了,继续往下(和关卡的对比)

当然也可以实现类似关卡的功能,初始化一个1的内部计数器,调用await方法让它停下来,

另一个可以作为闭锁的类是FutureTask,它描述了一个抽象的可携带结果的计算。它是通过Callable实现的,它等价于一个可携带结果的Runnable,并且有3个状态:等待、运行和完成(包括正常结束、取消和异常)。一旦FutureTask进入完成状态,它就永远停留在这个状态上。

1.2 Semaphore(信号量)

Semaphore类实际上就是操作系统中谈到的信号量的一种实现,其原理就不再累述,可见探索并发编程------操作系统篇

它就像一个容量池,用于限制可以访问某些资源的线程数目。一般操作系统的进程调度中使用了PV原语,需要设置一个信号量信号量表示可用资源的数量,P原语就相当于acquireV原语就相当于release

具体使用就是通过其acquire(获得一个许可)release(释放许可)方法来完成,如以下示例:

/**

 * 信号量测试

 * 这里用于控制对内容池的访问,内容池的大小作为Semaphore的构造函数传递给它

 * 每个线程获取数据之前必须获得许可。这样就限制了访问线程池的数目

 * 

 * @author Administrator

 *

 */

public class SemaphoreTes {

private static final int MAX_AVAILABLE = 5;

protected Object[] items = { "AAA""BBB""CCC""DDD""EEE" };

protected boolean[] used = new boolean[MAX_AVAILABLE];

private final Semaphore available = new Semaphore(MAX_AVAILABLEtrue);

public static void main(String args[]) {

final SemaphoreTes pool = new SemaphoreTes();

Runnable runner = new Runnable() {

@Override

public void run() {

try {

Object o;

o = pool.getItem();

System.out.println(Thread.currentThread().getName()

" acquire " + o);

Thread.sleep(1000);

pool.putItem(o);

System.out.println(Thread.currentThread().getName()

" release " + o);

catch (InterruptedException e) {

e.printStackTrace();

}

}

};

for (int i = 0; i < 10; i++)// 构造 10 个线程

{

Thread t = new Thread(runner, "t" + i);

t.start();

}

}

// 获取数据,需要得到许可

public Object getItem() throws InterruptedException {

available.acquire();

return getNextAvailableItem();

}

// 放回数据,释放许可

public void putItem(Object x) {

if (markAsUnused(x))

available.release();

}

protected synchronized Object getNextAvailableItem() {

for (int i = 0; i < MAX_AVAILABLE; ++i) {

if (!used[i]) {

used[i] = true;

return items[i];

}

}

return null;

}

protected synchronized boolean markAsUnused(Object item) {

for (int i = 0; i < MAX_AVAILABLE; ++i) {

if (item == items[i]) {

if (used[i]) {

used[i] = false;

return true;

else

return false;

}

}

return false;

}

}

输出:

t0 acquire AAA

t1 acquire BBB

t2 acquire CCC

t3 acquire DDD

t4 acquire EEE

t0 release AAA

t1 release BBB

t2 release CCC

t6 acquire AAA

t7 acquire BBB

t5 acquire CCC

t8 acquire DDD

t3 release DDD

t9 acquire EEE

t4 release EEE

t6 release AAA

t7 release BBB

t8 release DDD

t5 release CCC

t9 release EEE

信号量被初始化为容器锁期望容量的最大值,add操作在向底层容器添加条目之前,需要先获得一个许可,并且如果没有加入任何东西,则立刻释放许可。同样,一个成功过的remove操作释放ige许可,使得更多的元素能够加入其中。

将信号量初始化为1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也成为二进制信号量,因为它只有两种状态:一个可用的许可,0个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多Lock实现不同),即可以由线程释放“锁”,控制”锁“,而不是由所有者(因为信号量没有权的概念)。在某些专门的上下文(如死锁恢复)中会很有用

1.3 barrier(关卡)问题

在实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等所欲线程都到达某一个阶段以后再统一执行。

比如有几个旅行团需要途径深圳、广州、最后到达重庆。旅行团中自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要的古代其他旅行团到达此地以后再同时出发,直到都到达终点站重庆。这种情况下,CyclicBarrier就可以用了。

它是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。因为该barrier在释放等待线程以后可以重用,所以称为循环的barrierCyclicBarrier最重要的属性就是参与者个数,另一个重要方法就是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则继续等待。

关卡和闭锁类似,也是阻塞一组线程,直到某件事情发生,而不同在于关卡是等到符合某种条件的所有线程都达到关卡点。具体使用上可以用CyclicBarrier来应用关卡

private int threadCount;

private CyclicBarrier barrier;

private int loopCount = 10;

public CyclicBarrierTest(int threadCount) {

this.threadCount = threadCount;

barrier = new CyclicBarrier(threadCount, new Runnable() {

public void run() {

System.out.println("---------------");

}

});

for (int i = 0; i < threadCount; ++i) {

Thread thread = new Thread("test-thread " + i) {

public void run() {

for (int j = 0; j < loopCount; ++j) {

doTest(j);

try {

/**

 * 在到达10次以后,大家都会通过

 * 在这里通过之前,会执行barrier的那个回调方法

 * 而且他还可以被循环使用

 */

barrier.await();

catch (InterruptedException e) {

return;

catch (BrokenBarrierException e) {

return;

}

System.out.println("goon"+j);

}

}

};

thread.start();

}

}

private void doTest(int i) { /* do xxx */

System.out.println("test"+i);

}

public static void main(String args[]){

new CyclicBarrierTest(9);

}

在每个线程结束之后执行,可以统计用。

和闭锁不同的是,他要求子线程之间相互等待,直到大家都达到后,才能继续



以上是Java提供的一些并发工具,既然是工具就有它所适用的场景,因此需要知道它的特性,这样才能在具体场景下选择最合适的工具。

淘宝面试题:如何充分利用多核CPU,计算很大的List中所有整数的和

java多线程学习-java.util.concurrent详解() Latch/Barrier

1.4 Exchanger(两步关卡)

提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收伙伴线程的数据,并返回。

当运行不对称的活动时很有用,比如当一个线程填充buffer,另一个线程buffer中消费数据;这些线程可以用exchanger来交换数据。当两个线程通过exchanger交互了对象,这个交换对于连个线程来说都是安全的。所以在特定的使用场景比较有用(两个伙伴线程之间的数据交互

注意:

1.初始化Exchanger对象时,可以通过泛型指定杯子能交换的信息类型。如“new Exchanger<String>;” 表示只能交换String类型的信息。
2. Exchangerexchanger方法表示当前线程准备交换信息,等待其他线程与它交换信息。当有其他线程调用该Exchanger对象的exchange方法时,立即交换信息。

// 描述一个装水的杯子

    public static class Cup{

        // 标识杯子是否有水

        private boolean full = false;

        public Cup(boolean full){

            this.full = full;

        }

        // 添水,假设需要5s

        public void addWater(){

            if (!this.full){

                try {

                    Thread.sleep(5000);

                } catch (InterruptedException e) {

                }

                this.full = true;

            }

        }

        // 喝水,假设需要10s

        public void drinkWater(){

            if (this.full){

                try {

                    Thread.sleep(10000);

                } catch (InterruptedException e) {

                }

                this.full = false;

            }

        }

    }

    

    public static void testExchanger() {

        //    初始化一个Exchanger,并规定可交换的信息类型是杯子

        final Exchanger<Cup> exchanger = new Exchanger<Cup>();

        // 初始化一个空的杯子和装满水的杯子

        final Cup initialEmptyCup = new Cup(false); 

        final Cup initialFullCup = new Cup(true);

        //服务生线程

        class Waiter implements Runnable {

            public void run() {

                Cup currentCup = initialEmptyCup;

                try {

                    int i=0;

                    while (i < 2){

                        System.out.println("服务生开始往杯子中添水:"

                                + System.currentTimeMillis());

                        // 往空的杯子里加水

                        currentCup.addWater();

                        System.out.println("服务生添水完毕:"

                                + System.currentTimeMillis());

                        // 杯子满后和顾客的空杯子交换

                        System.out.println("服务生等待与顾客交换杯子:"

                                + System.currentTimeMillis());

                        currentCup = exchanger.exchange(currentCup);

                        System.out.println("服务生与顾客交换杯子完毕:"

                                + System.currentTimeMillis());

                        i++;

                    }

                } catch (InterruptedException ex) {

                }

            }

        }

        //顾客线程

        class Customer implements Runnable {

            public void run() {

                Cup currentCup = initialFullCup;

                try {

                    int i=0;

                    while (i < 2){

                        System.out.println("顾客开始喝水:"

                                + System.currentTimeMillis());

                        //把杯子里的水喝掉

                        currentCup.drinkWater();

                        System.out.println("顾客喝水完毕:"

                                + System.currentTimeMillis());

                        //将空杯子和服务生的满杯子交换

                        System.out.println("顾客等待与服务生交换杯子:"

                                + System.currentTimeMillis());

                        currentCup = exchanger.exchange(currentCup);

                        System.out.println("顾客与服务生交换杯子完毕:"

                                + System.currentTimeMillis());

                        i++;

                    }

                } catch (InterruptedException ex) {

                }

            }

        }

        

        new Thread(new Waiter()).start();

        new Thread(new Customer()).start();

    }

    

    public static void main(String[] args) {

        ExchangerTest.testExchanger();

}

服务生开始往杯子中添水:1288093204390

顾客开始喝水:1288093204406

服务生添水完毕:1288093209390

服务生等待与顾客交换杯子:1288093209390

顾客喝水完毕:1288093214406

顾客等待与服务生交换杯子:1288093214406

服务生与顾客交换杯子完毕:1288093214406

服务生开始往杯子中添水:1288093214406

顾客与服务生交换杯子完毕:1288093214406

1.5 FutureFutureTask

① Future接口标识异步计算的结果,它提供了检查计算是否完成的方法isDone(),以等待计算的完成,并取得计算的结果。

② 计算完成以后只能使用get获得计算结果

③ 有必要,可以在完成前阻塞此方法

④ 取消,则以cancel方法来执行

 
 

FutureTask类是Future的一个实现,并且实现了Runnable,所以可以通过Executor(线程池)来执行。

如过在主线程中需要执行比较耗时的操作,但又不想阻塞主线程时,可以把这些业务交给Future对象在后台完成,当主线程将来需要时,可以通Future对象获得后台作业的计算结果或者执行状态

个人的感觉来看,FutureRunnable

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多