分享

Inside AbstractQueuedSynchronizer (2) (此版比较完整)

 hh3755 2014-08-13

原文如下链接:

Inside AbstractQueuedSynchronizer (1)

Inside AbstractQueuedSynchronizer (2)

Inside AbstractQueuedSynchronizer (3)

Inside AbstractQueuedSynchronizer (4)

 

3 AbstractQueuedSynchronizer

 

3.1 Inheritance

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer继承自Object并且实现了Serializable接口,它只有一个成员变量private transient Thread exclusiveOwnerThread以及对应的getter/setter方法。该成员变量用于保存当前拥有排他访问权的线程。需要注意的是,该成员变量没有用volatile关键字修饰。


3.2 State

    AbstractQueuedSynchronizer用一个int(private volatile int state)来保存同步状态,以及对应的getter/setter/compareAndSetState方法。Java6新增了一个AbstractQueuedLongSynchronizer,它用一个long来保存同步状态,貌似目前没有被java.util.concurrent中的其它synchronizer所使用。对于ReentrantLock,state为0则意味着锁没有被任何线程持有;否则state保存了持有锁的线程的重入次数。

 

3.3 WaitQueue

    WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。

 

    标准的CLH lock queue通常被用来实现spin lock,它通过TheadLocal变量pred引用队列中的前一个节点(Node本身没有指向前后节点的引用),以下是标准的CLH lock queue的一个参考实现:

Java代码  收藏代码
  1. public class ClhSpinLock {  
  2.     private final ThreadLocal<Node> pred;  
  3.     private final ThreadLocal<Node> node;  
  4.     private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());  
  5.   
  6.     public ClhSpinLock() {  
  7.         this.node = new ThreadLocal<Node>() {  
  8.             protected Node initialValue() {  
  9.                 return new Node();  
  10.             }  
  11.         };  
  12.   
  13.         this.pred = new ThreadLocal<Node>() {  
  14.             protected Node initialValue() {  
  15.                 return null;  
  16.             }  
  17.         };  
  18.     }  
  19.   
  20.     public void lock() {  
  21.         final Node node = this.node.get();  
  22.         node.locked = true;  
  23.         Node pred = this.tail.getAndSet(node);  
  24.         this.pred.set(pred);  
  25.         while (pred.locked) {}  
  26.     }  
  27.   
  28.     public void unlock() {  
  29.         final Node node = this.node.get();  
  30.         node.locked = false;  
  31.         this.node.set(this.pred.get());  
  32.     }  
  33.   
  34.     private static class Node {  
  35.         private volatile boolean locked;  
  36.     }  
  37. }  

    其逻辑并不复杂:对于lock操作,只需要通过一个CAS操作即可将当前线程对应的节点加入到队列中,并且同时获得了predecessor节点的引用,然后就是等待predecessor释放锁;对于unlock操作,只需要将当前线程对应节点的locked成员变量设置为false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node对象,这是对GC友好的一个优化。如果不考虑这个优化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解决了cache-coherence traffic的问题:每个线程在busy loop的时候,并没有竞争同一个状态,而是只判断其对应predecessor的锁定状态。如果你担心false sharing问题,那么可以考虑将锁定状态padding到cache line的长度。此外,CLH spin lock通过FIFO的队列保证了锁竞争的公平性。

 

    AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞:在Inside AbstractQueuedSynchronizer (1) 中提到过,AbstractQueuedSynchronizer通过LockSupport实现了线程的block/unblock,因此需要通过successor引用找到后续的线程并将其唤醒(CLH spin lock因为是spin,所以不需要显式地唤醒)。此外,Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法。需要注意的是,虽然AbstractQueuedSynchronizer在绝大多数情况下是通过LockSupport进行线程的阻塞/唤醒,但是在特定情况下也会使用spin lock,static final long spinForTimeoutThreshold = 1000L这个静态变量设定了使用spin lock的一个阈值。

 

    对WaitQueue进行enqueue操作相关的代码如下:

Java代码  收藏代码
  1. private Node addWaiter(Node mode) {  
  2.     Node node = new Node(Thread.currentThread(), mode);  
  3.     // Try the fast path of enq; backup to full enq on failure  
  4.     Node pred = tail;  
  5.     if (pred != null) {  
  6.         node.prev = pred;  
  7.         if (compareAndSetTail(pred, node)) {  
  8.             pred.next = node;  
  9.             return node;  
  10.         }  
  11.     }  
  12.     enq(node);  
  13.     return node;  
  14. }  
  15.   
  16. private Node enq(final Node node) {  
  17.     for (;;) {  
  18.         Node t = tail;  
  19.         if (t == null) { // Must initialize  
  20.             if (compareAndSetHead(new Node()))  
  21.                 tail = head;  
  22.         } else {  
  23.             node.prev = t;  
  24.             if (compareAndSetTail(t, node)) {  
  25.                 t.next = node;  
  26.                 return t;  
  27.             }  
  28.         }  
  29.     }  
  30. }  

    addWaiter方法中的那个if分支,其实是一种优化,如果失败那么会调用enq方法进行enqueue。需要注意的是,以上代码中对next引用的设定是在enqueue成功之后进行的。这样做虽然没有并发问题,但是在判断一个node是否有sucessor时,不能仅仅通过next == null来判断,因为enqueue和设置next引用这两个步骤不是一个原子操作。

 

    对WaitQueue进行dequeue操作相关的代码如下:

Java代码  收藏代码
  1. private void setHead(Node node) {  
  2.     head = node;  
  3.     node.thread = null;  
  4.     node.prev = null;  
  5. }  

    setHead方法没有用到锁,也没有使用CAS,这样没有并发问题?没有,因为这个方法只会被持有锁的线程所调用,此时只需要将head指向持有锁的线程对应的node即可。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多