分享

高并发Java(5):JDK并发包1(上)

 lguo001 2016-09-16


来源:Hosee

链接:my.oschina.net/hosee/blog/607677


高并发Java(2):多线程基础中,我们已经初步提到了基本的线程同步操作。这次要提到的是在并发包中的同步控制工具。


1. 各种同步控制工具的使用


1.1 ReentrantLock


ReentrantLock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给JVM去处理,但是功能上是比较薄弱的。在JDK1.5之前,ReentrantLock的性能要好于synchronized,由于对JVM进行了优化,现在的JDK版本中,两者性能是不相上下的。如果是简单的实现,不要刻意去使用ReentrantLock。


相比于synchronized,ReentrantLock在功能上更加丰富,它具有可重入、可中断、可限时、公平锁等特点。


首先我们通过一个例子来说明ReentrantLock最初步的用法:


package test;

 

import java.util.concurrent.locks.ReentrantLock;

 

public class Test implements Runnable

{

    public static ReentrantLock lock = new ReentrantLock();

    public static int i = 0;

 

    @Override

    public void run()

    {

        for (int j = 0; j < 10000000;="">

        {

            lock.lock();

            try

            {

                i++;

            }

            finally

            {

                lock.unlock();

            }

        }

    }

 

    public static void main(String[] args) throws InterruptedException

    {

        Test test = new Test();

        Thread t1 = new Thread(test);

        Thread t2 = new Thread(test);

        t1.start();

        t2.start();

        t1.join();

        t2.join();

        System.out.println(i);

    }

 

}


有两个线程都对i进行++操作,为了保证线程安全,使用了 ReentrantLock,从用法上可以看出,与 synchronized相比,ReentrantLock就稍微复杂一点。因为必须在finally中进行解锁操作,如果不在 finally解锁,有可能代码出现异常锁没被释放,而synchronized是由JVM来释放锁。


那么ReentrantLock到底有哪些优秀的特点呢?


1.1.1 可重入


单线程可以重复进入,但要重复退出


lock.lock();

lock.lock();

try

{

    i++;

 

}           

finally

{

    lock.unlock();

    lock.unlock();

}


由于ReentrantLock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。


public class Child extends Father implements Runnable{

    final static Child child = new Child();//为了保证锁唯一

    public static void main(String[] args) {

        for (int i = 0; i < 50;="" i++)="">

            new Thread(child).start();

        }

    }

 

    public synchronized void doSomething() {

        System.out.println('1child.doSomething()');

        doAnotherThing(); // 调用自己类中其他的synchronized方法

    }

 

    private synchronized void doAnotherThing() {

        super.doSomething(); // 调用父类的synchronized方法

        System.out.println('3child.doAnotherThing()');

    }

 

    @Override

    public void run() {

        child.doSomething();

    }

}

class Father {

    public synchronized void doSomething() {

        System.out.println('2father.doSomething()');

    }

}


我们可以看到一个线程进入不同的 synchronized方法,是不会释放之前得到的锁的。所以输出还是顺序输出。所以synchronized也是重入锁


输出:


1child.doSomething()

2father.doSomething()

3child.doAnotherThing()

1child.doSomething()

2father.doSomething()

3child.doAnotherThing()

1child.doSomething()

2father.doSomething()

3child.doAnotherThing()

...


1.1.2.可中断


与synchronized不同的是,ReentrantLock对中断是有响应的。中断相关知识查看高并发Java(2):多线程基础


普通的lock.lock()是不能响应中断的,lock.lockInterruptibly()能够响应中断。


我们模拟出一个死锁现场,然后用中断来处理死锁


package test;

 

import java.lang.management.ManagementFactory;

import java.lang.management.ThreadInfo;

import java.lang.management.ThreadMXBean;

import java.util.concurrent.locks.ReentrantLock;

 

public class Test implements Runnable

{

    public static ReentrantLock lock1 = new ReentrantLock();

    public static ReentrantLock lock2 = new ReentrantLock();

 

    int lock;

 

    public Test(int lock)

    {

        this.lock = lock;

    }

 

    @Override

    public void run()

    {

        try

        {

            if (lock == 1)

            {

                lock1.lockInterruptibly();

                try

                {

                    Thread.sleep(500);

                }

                catch (Exception e)

                {

                    // TODO: handle exception

                }

                lock2.lockInterruptibly();

            }

            else

            {

                lock2.lockInterruptibly();

                try

                {

                    Thread.sleep(500);

                }

                catch (Exception e)

                {

                    // TODO: handle exception

                }

                lock1.lockInterruptibly();

            }

        }

        catch (Exception e)

        {

            // TODO: handle exception

        }

        finally

        {

            if (lock1.isHeldByCurrentThread())

            {

                lock1.unlock();

            }

            if (lock2.isHeldByCurrentThread())

            {

                lock2.unlock();

            }

            System.out.println(Thread.currentThread().getId() + ':线程退出');

        }

    }

 

    public static void main(String[] args) throws InterruptedException

    {

        Test t1 = new Test(1);

        Test t2 = new Test(2);

        Thread thread1 = new Thread(t1);

        Thread thread2 = new Thread(t2);

        thread1.start();

        thread2.start();

        Thread.sleep(1000);

        //DeadlockChecker.check();

    }

 

    static class DeadlockChecker

    {

        private final static ThreadMXBean mbean = ManagementFactory

                .getThreadMXBean();

        final static Runnable deadlockChecker = new Runnable()

        {

            @Override

            public void run()

            {

                // TODO Auto-generated method stub

                while (true)

                {

                    long[] deadlockedThreadIds = mbean.findDeadlockedThreads();

                    if (deadlockedThreadIds != null)

                    {

                        ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);

                        for (Thread t : Thread.getAllStackTraces().keySet())

                        {

                            for (int i = 0; i < threadinfos.length;="">

                            {

                                if(t.getId() == threadInfos[i].getThreadId())

                                {

                                    t.interrupt();

                                }

                            }

                        }

                    }

                    try

                    {

                        Thread.sleep(5000);

                    }

                    catch (Exception e)

                    {

                        // TODO: handle exception

                    }

                }

 

            }

        };

 

        public static void check()

        {

            Thread t = new Thread(deadlockChecker);

            t.setDaemon(true);

            t.start();

        }

    }

 

}


上述代码有可能会发生死锁,线程1得到lock1,线程2得到lock2,然后彼此又想获得对方的锁。


我们用jstack查看运行上述代码后的情况



的确发现了一个死锁。


DeadlockChecker.check();方法用来检测死锁,然后把死锁的线程中断。中断后,线程正常退出。


1.1.3.可限时


超时不能获得锁,就返回false,不会永久等待构成死锁


使用lock.tryLock(long timeout, TimeUnit unit)来实现可限时锁,参数为时间和单位。


举个例子来说明下可限时:


package test;

 

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantLock;

 

public class Test implements Runnable

{

    public static ReentrantLock lock = new ReentrantLock();

 

    @Override

    public void run()

    {

        try

        {

            if (lock.tryLock(5, TimeUnit.SECONDS))

            {

                Thread.sleep(6000);

            }

            else

            {

                System.out.println('get lock failed');

            }

        }

        catch (Exception e)

        {

        }

        finally

        {

            if (lock.isHeldByCurrentThread())

            {

                lock.unlock();

            }

        }

    }

 

    public static void main(String[] args)

    {

        Test t = new Test();

        Thread t1 = new Thread(t);

        Thread t2 = new Thread(t);

        t1.start();

        t2.start();

    }

}


使用两个线程来争夺一把锁,当某个线程获得锁后,sleep6秒,每个线程都只尝试5秒去获得锁。


所以必定有一个线程无法获得锁。无法获得后就直接退出了。


输出:


get lock failed


1.1.4.公平锁


使用方式:


public ReentrantLock(boolean fair) 

 

public static ReentrantLock fairLock = new ReentrantLock(true);


一般意义上的锁是不公平的,不一定先来的线程能先得到锁,后来的线程就后得到锁。不公平的锁可能会产生饥饿现象。


公平锁的意思就是,这个锁能保证线程是先来的先得到锁。虽然公平锁不会产生饥饿现象,但是公平锁的性能会比非公平锁差很多。


1.2 Condition


Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/signal()


await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。


awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。


这里就不再详细介绍了。举个例子来说明:


package test;

 

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

 

public class Test implements Runnable

{

    public static ReentrantLock lock = new ReentrantLock();

    public static Condition condition = lock.newCondition();

 

    @Override

    public void run()

    {

        try

        {

            lock.lock();

            condition.await();

            System.out.println('Thread is going on');

        }

        catch (Exception e)

        {

            e.printStackTrace();

        }

        finally

        {

            lock.unlock();

        }

    }

 

    public static void main(String[] args) throws InterruptedException

    {

        Test t = new Test();

        Thread thread = new Thread(t);

        thread.start();

        Thread.sleep(2000);

 

        lock.lock();

        condition.signal();

        lock.unlock();

    }

 

}


上述例子很简单,让一个线程await住,让主线程去唤醒它。condition.await()/signal只能在得到锁以后使用。


1.3.Semaphore


对于锁来说,它是互斥的排他的。意思就是,只要我获得了锁,没人能再获得了。


而对于Semaphore来说,它允许多个线程同时进入临界区。可以认为它是一个共享锁,但是共享的额度是有限制的,额度用完了,其他没有拿到额度的线程还是要阻塞在临界区外。当额度为1时,就相等于lock


下面举个例子:


package test;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

 

public class Test implements Runnable

{

    final Semaphore semaphore = new Semaphore(5);

    @Override

    public void run()

    {

        try

        {

            semaphore.acquire();

            Thread.sleep(2000);

            System.out.println(Thread.currentThread().getId() + ' done');

        }

        catch (Exception e)

        {

            e.printStackTrace();

        }finally {

            semaphore.release();

        }

    }

 

    public static void main(String[] args) throws InterruptedException

    {

        ExecutorService executorService = Executors.newFixedThreadPool(20);

        final Test t = new Test();

        for (int i = 0; i < 20;="">

        {

            executorService.submit(t);

        }

    }

}


有一个20个线程的线程池,每个线程都去 Semaphore的许可,Semaphore的许可只有5个,运行后可以看到,5个一批,一批一批地输出。


当然一个线程也可以一次申请多个许可


public void acquire(int permits) throws InterruptedException


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多