分享

Java并发编程高级内容介绍

 流曲频阳 2016-12-19

计数器:CountDownLatch

CountDownLatch类似于一个计数器,和Atomic类比较相近,操作是原子的,即多个线程同时只能有一个可以去操作。CountDownLatch对象设置一个初始的数字作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程调用countDown()减为0为止。典型的应用场景就是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。例如在Zookeeper的使用过程中,由于客户端与服务器建立连接是异步调用的,因此主线程需要await()阻塞直至异步回调countDown()完成。


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class CountDownLatchTest {
    public static void main(String[] args) {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread work1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + " doing work...start");
                try {
                    Thread.sleep(200);
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + " doing work...end ");
                countDownLatch.countDown();
            }
        },"work1");
        Thread work2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " doing work...start");
                try {
                    Thread.sleep(200);
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " doing work...end ");
                countDownLatch.countDown();
            }
        },"work2");
        work1.start();
        work2.start();
        try {
            countDownLatch.await();
            System.out.println("all workers finish ");
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}



齐步走:CyclicBarrier

Barrier的意思是栅栏,就是让一组线程相互等待,直至所有线程都到齐了,那么就可以齐步走。Cyclic是循环的意思,就是说Barrier可以循环使用。CyclicBarrier主要的方法就是await(),较CountDownLatch的await()虽然都是阻塞,但是CyclicBarrier.await()有返回值int,即当前线程是第几个到达这个Barrier的线程。

构造CyclicBarrier时指定计数值,await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始。在构造方法上还可以传递一个Runnable对象,阻塞解除时这个Runnable会得到运行。

CyclicBarrier有点“不见不散”的味道,想一想,如果某个成员因某种原因来不了Barrier这个地方,那么我们一直等待吗?实际中,如果来不了理应通知其他成员,别等了,回家吧!注意到CyclicBarrier.await()独有的BrokenBarrierException异常

wkiol1hu62jtip18aaagav0nfvo139.png


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class CyclicBarrierTest {
    public static void main(String[] args) {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2new Runnable() {
            @Override
            public void run() {
                System.out.println("都准备好啦!");
            }
        });
        Thread runman1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(200);
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "i am ok");
                catch (InterruptedException e) {
                    e.printStackTrace();
                catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"runman1");
        Thread runman2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "i am ok");
                catch (InterruptedException e) {
                    e.printStackTrace();
                catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        },"runman2");
        runman1.start();
        runman2.start();
    }
}


Callable And Future

在博主以前的博客Java Future模式实现中有介绍Future模式,Future模式非常适合在处理耗时很长的业务逻辑,可以有效的减少系统的响应时间,提高系统的吞吐量。JDK其实已经为我们提供了API实现,我们来看一段代码即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FutureTest {
    public static void main(String[] args) {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "ok";
            }
        });
        ExecutorService es = Executors.newFixedThreadPool(1);
        es.submit(futureTask);
        System.out.println("开启线程去异步处理,主线程继续往下执行!");
        try {
            System.out.println("取得异步处理结果:" + futureTask.get());
        catch (InterruptedException e) {
            e.printStackTrace();
        catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

注意到线程池执行任务,可以利用2个方法:

wkiom1hvo5ktzt65aaaf84kvgac039.png

wkiom1hvpbaj2sh-aaagyfavkew459.png

wkiom1hvo-jto8zfaaagw9t46oo296.png

submit和execute有什么区别呢?从入参和结果类型就知道了。



信号量:Semaphore

Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么显然同时只能有5个人占用厕所,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的fair参数选项。

Semaphore可以控制某个资源可被同时访问的个数(构造方法传入),通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。


代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
    final Semaphore semaphore = new Semaphore(5);
    for(int i = 0 ; i < 6 ; i++){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " 运行...");
                    Thread.sleep(1000);
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName() + " 结束...");
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },String.valueOf(i)).start();
    }
}


Condition

JDK由原始的synchronized发展到Lock,以类的方式提供锁机制,发展出重入锁、读写锁,以类的形式存在自然功能更加强大灵活,比如可以tryLock进行锁的嗅探。在synchronized代码块中我们可以使用wait/notify/notifyAll来进行线程的协同工作,那么JDK也发展了这一块,即Condition。Condition.await类似于wait,Condition.signal/signalAll类似于notify/nofityAll。下面我们简单实现一个Condition版的生产者/消费者。


处理核心:Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Handler {
    //容器
    private LinkedList<String> linkedList = new LinkedList<String>();
    //限制
    private int MAX_SIZE = 3;
    //锁
    private Lock lock = new ReentrantLock();
    //condition  实际上,可以new多个condition,这里暂且只是用给一个
    private Condition condition = lock.newCondition();
    public void put(String bread){
        try{
            lock.lock();
            if(linkedList.size() == MAX_SIZE){
                System.out.println("容器已满");
                condition.await();
            }
            linkedList.add(bread);
            System.out.println("放入面包" + bread);
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void eat(){
        try{
            lock.lock();
            if(linkedList.size() == 0){
                System.out.println("容器为空");
                condition.await();
            }
            String bread = linkedList.removeFirst();
            System.out.println("吃掉一个面包" + bread);
            condition.signalAll();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}


生产者:Produce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Produce implements Runnable{
    private Handler handler;
    public Produce(Handler handler) {
        this.handler = handler;
    }
    @Override
    public void run() {
        for(int i = 0 ; i < 10 ; i++){
            try {
                Thread.sleep(new Random().nextInt(1000));
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            handler.put(String.valueOf(i));
        }
    }
}


消费者:Consume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Consume implements Runnable{
    private Handler handler;
    public Consume(Handler handler) {
        this.handler = handler;
    }
    @Override
    public void run() {
        for (int i = 0 ; i < 10 ; i++){
            try {
                Thread.sleep(new Random().nextInt(1000));
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            handler.eat();
        }
    }
}


Main:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main {
    public static void main(String[] args) {
        Handler handler = new Handler();
        Produce produce = new Produce(handler);
        Consume consume = new Consume(handler);
        new Thread(consume).start();
        new Thread(produce).start();
        new Thread(produce).start();
    }
}




本文出自 “学海无涯 心境无限” 博客,请务必保留此出处http://zhangfengzhe.blog.51cto.com/8855103/1883655

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多