AbstractQueuedSynchronizer简介AbstractQueuedSynchronizer提供了一个FIFO队列,可以看做是一个可以用来实现锁以及其他需要同步功能的框架。这里简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如ReentrantLock,CountDownLatch等。 本篇文章基于JDK1.8来介绍,该类有许多实现类: ![]() 其中,我们最常用的大概就是ReentrantLock和CountDownLatch了。ReentrantLock提供了对代码块的并发访问控制,也就是锁,说是锁,但其实并没有用到关键字 AQS的两种功能从使用上来说,AQS的功能可以分为两种:独占和共享。对于这两种功能,有一个很常用的类:ReentrantReadWriteLock,其就是通过两个内部类来分别实现了这两种功能,提供了读锁和写锁的功能。但子类实现时,只能实现其中的一种功能,即要么是独占功能,要么是共享功能。 对于独占功能,例如如下代码: ReentrantLock lock = new ReentrantLock();
...
public void function(){
lock.lock();
try {
// do something...
} finally {
lock.unlock();
}
}
这个很好理解,通过ReentrantLock来保证在 对于共享功能,例如如下代码:
代码中的
AQS独占锁的内部实现AQS的主要数据结构由于使用AQS可以实现锁的功能,那么下面就要分析一下究竟是如何实现的。 AQS内部维护着一个FIFO的队列,该队列就是用来实现线程的并发访问控制。队列中的元素是一个Node类型的节点,Node的主要属性如下: static final class Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
其中,队列里还有一个 AQS中有一个
ReentrantLock类的结构下面通过ReentrantLock的实现进一步分析重入锁的实现。 首先看一下lock方法: public void lock() {
sync.lock();
}
该方法调用了
对于ReentrantLock,有两种获取锁的模式:公平锁和非公平锁。所以对应有两个内部类,都继承自Sync。而Sync继承自AQS: ![]() 本文主要通过公平锁来介绍,看一下FairSync的定义:
AQS获取独占锁的实现acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法主要工作如下:
tryAcquire方法
既然该方法需要子类来实现,为什么不使用 该方法是在ReentrantLock中的FairSync和NonfairSync的两个内部类来实现的,这里以FairSysc-公平锁来说明: protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error('Maximum lock count exceeded');
setState(nextc);
return true;
}
return false;
}
addWaiter方法看下addWaiter方法的定义:
该方法就是根据当前线程创建一个Node,然后添加到队列尾部。 enq方法private Node enq(final Node node) {
// 重复直到成功
for (;;) {
Node t = tail;
// 如果tail为null,则必须创建一个Node节点并进行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 尝试CAS来设置队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法该方法的功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。
这里有几个问题很重要:
下面分别来分析一下。 什么条件下需要park? 看下 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可见,只有在前继节点的状态是SIGNAL时,需要park。第二种情况稍后会详细介绍。 为什么要判断中断状态? 首先要知道,acquireQueued方法中获取锁的方式是死循环,判断是否中断是在parkAndCheckInterrupt方法中实现的,看下该方法的代码:
非常简单,阻塞当前线程,然后返回线程的中断状态并复位中断状态。
如果acquireQueued执行完毕,返回中断状态,回到acquire方法中,根据返回的中断状态判断是否需要执行 为什么要多做这一步呢?先判断中断状态,然后复位,如果之前线程是中断状态,再进行中断? 这里就要介绍一下park方法了。park方法是Unsafe类中的方法,与之对应的是unpark方法。简单来说,当前线程如果执行了park方法,也就是阻塞了当前线程,反之,unpark就是唤醒一个线程。 具体的说明请参考http://blog.csdn.net/hengyunabc/article/details/28126139 park与wait的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断的状态,park与wait的效果是一样的;如果一个线程是中断的状态,这时执行wait方法会报 所以,知道了这一点,就可以知道为什么要进行中断状态的复位了:
所以,这里判断线程中断的状态实际上是为了不让循环一直执行,要让当前线程进入阻塞的状态。想象一下,如果不这样判断,前一个线程在获取锁之后执行了很耗时的操作,那么岂不是要一直执行该死循环?这样就造成了CPU使用率飙升,这是很严重的后果。 死循环不会引起CPU使用率飙升? 上面已经说明。 cancelAcquire方法在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。该方法代码如下: private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 设置该节点不再关联任何线程
node.thread = null;
// Skip cancelled predecessors
// 通过前继节点跳过取消状态的node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取过滤后的前继节点的后继节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置状态为取消状态
node.waitStatus = Node.CANCELLED;
/*
* If we are the tail, remove ourselves.
* 1.如果当前节点是tail:
* 尝试更新tail节点,设置tail为pred;
* 更新失败则返回,成功则设置tail的后继节点为null
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
/*
* 2.如果当前节点不是head的后继节点:
* 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL;
* 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空;
* 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
该方法中执行的过程有些复杂,首先是要获取当前节点的前继节点,如果前继节点的状态不是取消状态(即 接下来的工作可以分为3种情况:
我们依次来分析一下: 当前节点是tail 这种情况很简单,因为tail是队列的最后一个节点,如果该节点需要取消,则直接把该节点的前继节点的next指向null,也就是把当前节点移除队列。出队的过程如下: ![]() 注意:经验证,这里并没有设置node的prev为null。 当前节点不是head的后继节点,也不是tail ![]() 这里将node的前继节点的next指向了node的后继节点,真正执行的代码就是如下一行:
当前节点是head的后继节点 ![]() 这里直接unpark后继节点的线程,然后将next指向了自己。 这里可能会有疑问,既然要删除节点,为什么都没有对prev进行操作,而仅仅是修改了next? 要明确的一点是,这里修改指针的操作都是CAS操作,在AQS中所有以 那么在执行cancelAcquire方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见 什么时候修改prev呢?其实prev是由其他线程来修改的。回去看下shouldParkAfterFailedAcquire方法,该方法有这样一段代码:
该段代码的作用就是通过prev遍历到第一个不是取消状态的node,并修改prev。 这里为什么可以更新prev?因为shouldParkAfterFailedAcquire方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有当下一个节点获得锁的时候才会设置head),所以这里可以更新prev,而且不必用CAS来更新。 AQS释放独占锁的实现释放通过unlock方法来实现: public void unlock() {
sync.release(1);
}
该方法调用了release方法,release是在AQS中定义的,看下release代码:
这里首先尝试着去释放锁,成功了之后要去唤醒后继节点的线程,这样其他的线程才有机会去执行。 tryRelease代码如下: protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
是不是和tryAcquire方法类似?该方法也需要被重写,在Sync类中的代码如下:
当前线程被释放之后,需要唤醒下一个节点的线程,通过unparkSuccessor方法来实现: private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要功能就是要唤醒下一个线程,这里 到这里,通过ReentrantLock的lock和unlock来分析AQS独占锁的实现已经基本完成了,但ReentrantLock还有一个非公平锁NonfairSync。 其实NonfairSync和FairSync主要就是在获取锁的方式上不同,公平锁是按顺序去获取,而非公平锁是抢占式的获取,lock的时候先去尝试修改state变量,如果抢占成功,则获取到锁:
非公平锁的tryAcquire方法调用了nonfairTryAcquire方法: final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error('Maximum lock count exceeded');
setState(nextc);
return true;
}
return false;
}
该方法比公平锁的tryAcquire方法在第二个if判断中少了一个是否存在前继节点判断,FairSync中的tryAcquire代码中的这个if语句块如下:
总结本文从ReentrantLock出发,比较完整的分析了AQS内部独占锁的实现,总体来说实现的思路很清晰,就是使用了标志位 队列的方式来处理锁的状态,包括锁的获取,锁的竞争以及锁的释放。在AQS中,state可以表示锁的数量,也可以表示其他状态,state的含义由子类去定义,自己只是提供了对state的维护。AQS通过state来实现线程对资源的访问控制,而state具体的含义要在子类中定义。 AQS在队列的维护上的实现比较复杂,尤其是节点取消时队列的维护,这里并不是通过一个线程来完成的。同时,AQS中大量的使用CAS来实现更新,这种更新能够保证状态和队列的完整性。 |
|
来自: liang1234_ > 《锁机制》