Inside AbstractQueuedSynchronizer (1)
Inside AbstractQueuedSynchronizer (2)
Inside AbstractQueuedSynchronizer (3)
Inside AbstractQueuedSynchronizer (4)
3.4 Template Method
AbstractQueuedSynchronizer提供了以下几个protected方法用于子类改写
- protected boolean tryAcquire(int arg)
- protected boolean tryRelease(int arg)
- protected int tryAcquireShared(int arg)
- protected boolean tryReleaseShared(int arg)
- protected boolean isHeldExclusively()
这几个方法的默认实现是抛出UnsupportedOperationException,子类可以根据需要进行改写。
AbstractQueuedSynchronizer中最基本的acquire流程的相关代码如下:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
-
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null;
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
如果tryAcquire失败,那么当前线程可能会被enqueue到WaitQueue,然后被阻塞。 shouldParkAfterFailedAcquire方法会确保每个线程在被阻塞之前,其对应WaitQueue中的节点的waitStatus被设置为Node.SIGNAL(-1),以便在release时避免不必要的unpark操作。此外shouldParkAfterFailedAcquire还会清理WaitQueue中已经超时或者取消的Node。需要注意的是,在某个线程最终被阻塞之前,tryAcquire可能会被多次调用。
AbstractQueuedSynchronizer中最基本的release流程的相关代码如下:
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
-
- private void unparkSuccessor(Node node) {
-
-
-
-
-
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
-
-
-
-
-
-
-
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
release方法中,总是总head节点开始向后查找sucessor。只有当该sucessor的waitStatus被设置的情况下才会调用unparkSuccessor。unparkSuccessor方法中首先清除之前设置的Node.waitStatus,然后向后查找并且唤醒第一个需要被唤醒的sucessor。需要注意的是,if (s == null || s.waitStatus > 0)这个分支中,查找是从tail节点开始,根据prev引用向前进行。在Inside AbstractQueuedSynchronizer (2)
中提到过,Node.next为null并不一定意味着没有sucessor,虽然WaitQueue是个双向链表,但是根据next引用向后查找sucessor不靠谱,而根据prev引用向前查找predecessor总是靠谱。
3.5 Fairness
到目前为止我们已经知道,WaitQueue是个FIFO的队列,唤醒也总是从head开始。但是AbstractQueuedSynchronizer却并不一定是公平的(实际上,大多数情况下都是在非公平模式下工作)。如果在看一遍acquire方法会发现,tryAcquire的调用顺序先于acquireQueued,也就是说后来的线程可能在等待中的线程之前acquire成功。这种场景被称为barging FIFO strategy,它能提供更高的吞吐量。
大多数AbstractQueuedSynchronizer的子类都同时提供了公平和非公平的实现,例如ReentrantLock提供了NonfairSync和FairSync。例如其FairSync的tryAcquire方法如下:
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
tryAcquire方法返回true的条件之一是!hasQueuedPredecessors() 。hasQueuedPredecessors的代码如下:
- public final boolean hasQueuedPredecessors() {
-
-
-
- Node t = tail;
- Node h = head;
- Node s;
- return h != t &&
- ((s = h.next) == null || s.thread != Thread.currentThread());
- }
综上, FairSync优先确保等待中线程先acquire成功。但是公平性也不是绝对的:在一个多线程并发的环境下,就算锁的获取是公平的,也不保证后续的其它处理过程的先后顺序。
既然默认情况下使用的都是NonfairSync,那么FairSync适合什么样的场景呢?如果被锁所保护的代码段的执行时间比较长,而应用又不能接受线程饥饿(NonfairSync可能会导致虽然某个线程长时间排队,但是仍然无法获得锁的情况)的场景下可以考虑使用FairSync。对于ReentrantLock,在其构造函数中传入true,即可构造一把公平锁。
|