分享

Java 并发编程:AQS框架(白话版)

 hh3755 2014-08-13

1. 简介

转载:http:///2014/06/java-concurrent-4/

Java内置的锁,其优势是可以花最小的空间开销创建锁(因为每个JAVA对象都可以作为锁使用)和最少的时间开销获得锁(单线程可以在最短时间内获得锁)。线程同步越来越多地被用在多处理器上,特别是在高并发的情况下,然而,JVM内置锁的表现一般,而且不支持任何公平策略。从Java 5开始在java.util.concurrent包中引入了有别于synchronized的同步框架。
设计一个同步器至少应该具以下有两种操作:一个获取方法,如果当前状态不允许,将一直阻塞这个线程;一个释放方法,修改状态,让其他线程有运行的机会。
并发包中并没有为同步器提供一个统一的API,获取和释放方法在不同的类中的名称不同,比如获取方法有:Lock.lock, Semaphore.acquire, CountDownLatch.await和FutureTask.get.这些方法一般都重载有多种版本:阻塞与非阻塞版本、支持超时、支持中断。
java.util.concurrent包中有很多同步类,比如互斥锁、读写锁、信号量等,这些同步类几乎都可以用不同方式来实现,但是JSR166建立了一个同步中心类AbstractQueuedSynchronizer(简称:AQS)的框架,其中提供了大量的同步操作,而且用户还可以在此类的基础上自定义自己的同步类。其设计目标主要有两点:
1、提高可扩展性,用户可以自定义自己的同步类
2、最大限度地提高吞吐量,提供自定义公平策略

2. 设计和实现

同步器的设计比较直接,前面提到包含获取和释放两个操作:
获取操作过程如下:

while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;

释放操作:

update synchronization state;
if (state may permit a blocked thread to acquire)
    unblock one or more queued threads;

要满足以上两个操作,需要以下3点来支持:
1、原子操作同步状态;
2、阻塞或者唤醒一个线程;
3、内部应该维护一个队列。

3. 同步状态

AQS用的是一个32位的整型来表示同步状态的,可以通过以下几个方法来设置和修改这个状态字段:getState(),setState(),compareAndSetState().这些方法都需要java.util.concurrent.atomic包的支持,采用CAS操作。
将state设置为32位整型是一个务实的决定,虽然JSR166提供了64位版本的原子操作,但它还是使用对象内部锁来实现的,如果采用64位的state会导致同步器表现不良好。32位同步器满足大部分应用,如果确实需要64位的状态,可以使用AbstractQueuedLongSynchronizer类。
AQS是一个抽象类,如果它的实现类想要拥有对获取和释放的控制权,那它必须实现tryAcquire和tryRelease两个方法。

4. 阻塞

JSR166以前还没有好的阻塞和解除阻塞线程的API可以使用,只有Thread.suspend() 和 Thread.resume(),但这两个方法已经被废弃了,原因是有可能导致死锁。如果一个线程拥有监视器然后调用 Thread.suspend() 使自已阻塞,另一个线程试图调用Thread.resume()去唤醒它,那么这个线程去获取监视器时即出现死锁。
前面提到的LockSupport解决了这个问题,LockSupport.park()可以阻塞一个线程,LockSupport.unpack()可以解除阻塞.调用一次park(),然后调用多次unpack()只会唤醒一个线程.阻塞针对线程而不是针对同步器。

5. 队列

同步框架最重要的是要有一个同步队列,在这里被严格限制为FIFO队列,因此这个同步框架不支持基于优先级的同步策略。同步队列采用非阻塞队列毋庸置疑,当时非阻塞队列只有两个可供选择CLH队列锁和MCS队列锁.原始的CLH Lock仅仅使用自旋锁,但是相对于MCS Lock它更容易处理cancel和timeout,所以选择了CLH Lock。
CLH队列锁的优点是:进出队快,无锁,畅通无阻(即使在有竞争的情况下,总有一个线程总是能够很快插入到队尾);检查是否有线程在等待也是很容易的(只需要检查头尾指针是否相同)。最后设计出来的变种CLH Lock和原始的CLH Lock有较大的差别:
1、为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
2、第二个变动是在每个node里使用一个状态字段去控制阻塞,而不是自旋。一个排队的线程调用acquire(),只有在通过了子类实现的tryAcquire()才能返回,确保只有队头线程才允许调用tryAcquire()。
3、另外还有一些微小的改动:head结点使用的是傀儡结点。
变种的CLH队列如下图所示:
java-concurrent-4
结点中有一个状态位,这个状态位与线程状态密切相关,这个状态位(waitStatus)是一个32位的整型常量,它的取值如下:

static final int CANCELLED =  1;  //因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收
static final int SIGNAL    = -1;  //表示这个结点的继任结点被阻塞了,到时需要通知它
static final int CONDITION = -2;  //表示这个结点在条件队列中,因为等待某个条件而被阻塞
static final int PROPAGATE = -3;  //使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播
0//新结点会处于这种状态。

同步框架提供了一个ConditionObject,一般和Lock接口配合来支持互斥模型,它提供类似JVM同步器的操作。条件对象可以和其他同步器有效的整合,它修复了JVM内置同步器的不足:一个锁可以有多个条件。条件结点内部也有一个状态字段,条件结点是通过nextWaiter指针串起来的一个独立的队列。条件队列中的线程在获取锁之前,必须先被transfer到同步队列中去。transfer先断开条件队列的第一个结点,然后插入到同步队列中,这个新插入到同步队列中的结点和同步队列中的结点一起排队等待获取锁。

6. 用法

AbstractQueuedSynchronizer是一个采用模板方法模式实现的同步器基类,子类只需要实现获取和释放方法。子类一般不直接用于同步控制。因为获取和释放方法一般是私有的,实现细节不必暴露出来,所以常用委派的方法来使用同步器类:在一个类的内部申请一个私有的AQS的子类,委派它的所有同步方法。
AbstractQueuedSynchronizer类还提供了其他一些同步控制方法,包括超时和中断版的获取方法,还集成了独占模式的同步器,如acquireShared(),tryReleaseShared()等方法。
虽然内部队列被设计为FIFO,但并不意味着这个同步器一定是公平的。前面谈到,在tryAcquire()检查之后再排队。因此,新线程完全可以偷偷排在第一个线程前面。之所以不采用FIFO,有时候是想获得更高的吞吐量,为了减少等待时间,新到的线程与队列头部的线程一起公平竞争,如果新来的线程比队头的线程快,那么这个新来的线程就获取锁。队头线程失去竞争会再次阻塞,它的继任也将会被阻塞,但这样能避免饥饿。
如果需要绝对公平,那很简单,只需要在tryAcquire()方法,不在队头返回false即可。检查是否在队头可以使用getFirstQueuedThread()方法。有一情况是,队列是空的,同时有多个线程一拥而入,谁先抢到锁就谁运行,这其实与公平并不冲突,是对公平的补充。

7. 获取与释放锁

AbstractQueuedSynchronizer中比较重要的两个操作是获取和释放,以下是各种获取操作:

public final void acquire(int arg);
public final void acquireInterruptibly(int arg);
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg);
protected boolean tryAcquire(int arg); 
protected int tryAcquireShared(int arg);
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;

释放操作

public final boolean release(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);

AbstractQueuedSynchronizer的内部类Node类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个结点支持共享模式还是独占模式,共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。

1) 独占模式

AbstractQueuedSynchronizer类方法中方法名不含shared的默认是独占模式,在独占模式下,子类需要重写tryAcquire()方法。
线程首先通过tryAcquire()方法在独占模式下获取锁,如果获取成功就直接返回,否则通过acquireQueued()获取锁,如果仍然失败则selfInterrupt当前线程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果获取锁失败,那么就创建一个代表当前线程的结点加入到等待队列的尾部
            selfInterrupt();
    }

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //判断队列中是否有元素
            node.prev = pred; //如果有
            if (compareAndSetTail(pred, node)) { //就设置当前结点为队尾结点
                pred.next = node;
                return node;
            }
        }
        enq(node); //如果没有元素,表示队列为空,做入队操作
        return node;
    }

    private Node enq(final Node node) { //enq方法采用的是变种CLH算法
        for (;;) {
            Node t = tail;
            if (t == null) { //先看头结点是否为空,这一步只会在队列初始化时会执行
                if (compareAndSetHead(new Node())) //如果为空就创建一个傀儡结点
                    tail = head; //头尾指针都指向这个傀儡结点
            } else { //如果头结点非空
                node.prev = t;
                if (compareAndSetTail(t, node)) { //采用CAS操作将当前结点插入到头结点后面
                    t.next = node; //如果在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,然后再把当前结点插入到尾结点后面,尾指针指向当前结点
                    return t; //入队成功
                }
            }
        }
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //判断新结点的前趋结点是否为头结点
                if (p == head && tryAcquire(arg)) { //如果它的前趋是头结点,让前趋获取锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL
                    parkAndCheckInterrupt()) //当前线程可以安全地挂起,整个过程结束
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

将新加入的结点放入队列之后,这个结点有两种状态,要么获取锁,要么就挂起,如果这个结点不是头结点,就看看这个结点是否应该挂起,如果应该挂起,就挂起当前结点,是否应该挂起是通过shouldParkAfterFailedAcquire()方法来判断的

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; //首先检查前趋结点的waitStatus位
        if (ws == Node.SIGNAL) //如果为SIGNAL,表示前趋结点会通知它,那么它可以放心大胆地挂起了
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { //如果前趋结点是一个被取消的结点
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); //就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止
            pred.next = node; //将找到的这个结点作为它的前趋结点
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //将找到的这个结点的waitStatus位设置为SIGNAL
        }
        return false; //返回false表示线程不应该被挂起
    }

独占模式下释放锁是通过方法release ()来实现的,首先调用子类的tryRelease()尝试释放锁,如果失败,直接返回;如果成功调用unparkSuccessor ()方法做后续处理。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

释放锁成功后需要唤醒继任结点,是通过方法unparkSuccessor实现的

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; //node参数传进来的是头结点,首先检查头结点的waitStatus位
        if (ws < 0) //如果为负,表示头结点还需要通知后继结点,后面会通知后续节点,因此将该该标志位清0.
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; //然后查看头结点的下一个结点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        } //如果下一个结点不为空且它的waitStatus<=0,表示后继结点没有被取消,是一个可以唤醒的结点,于是唤醒后继结点返回;如果后继结点为空或者被取消了,则寻找下一个可唤醒的结点,然后唤醒它返回。
        if (s != null)
            LockSupport.unpark(s.thread);
}

2) 共享模式

如果子类想支持共享模式,同样必须重写tryAcquireShared()方法,线程首先通过tryAcquireShared()方法在共享模式下获取锁,如果获取成功就直接返回,否则通过doAcquireShared()获取锁

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) //如果获取成功就直接返回
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); //创建一个新结点(共享模式),加入到队尾
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //判断新结点的前趋结点是否为头结点
                if (p == head) { //如果它的前趋是头结点 
                    int r = tryAcquireShared(arg); //让前趋在共享模式下获取锁
                    if (r >= 0) { //如果获取成功,把当前结点设置为头结点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL
                    parkAndCheckInterrupt()) //当前线程可以安全地挂起,整个过程结束
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

共享模式下释放锁是通过方法releaseShared()来实现的,首先调用子类的tryReleaseShared()尝试释放锁,如果失败,直接返回;如果成功调用doReleaseShared方法做后续处理。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
doReleaseShared()方法
    private void doReleaseShared() { //这个方法就一个目的,就是把当前结点设置为SIGNAL或者PROPAGATE
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果当前结点不是头结点也不是尾结点
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先判断当前结点的状态位是否为SIGNAL,如果是就设置为0
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

因为共享模式下更多使用PROPAGATE来传播,SIGNAL会被经过两步改为PROPAGATE:

compareAndSetWaitStatus(h, Node.SIGNAL, 0)
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

为什么要经过两步呢?原因在unparkSuccessor()方法:如果直接从SIGNAL到PROPAGATE,那么到unparkSuccessor()方法里面又被设置为0:SIGNAL->PROPAGATE->0->PROPAGATE,对头结点相当于多做了一次compareAndSet操作。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多