前言
一、同步队列的结构与实现1、同步队列的结构(1)结构介绍AQS使用的同步队列是基于一种CLH锁算法来实现(引用网上资料对CLH简单介绍):
而在源码中也有这样的介绍: /** * Wait queue node class. * * <p>The wait queue is a variant of a "" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. * ........... * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> * .............. 在AQS中的同步队列结构以及获取/释放锁都是基于此实现的,这里我们先放一个我画的基本结构来理解AQS同步队列,再进一步介绍一些细节。 根据以上图我们看到:
在源码中我们可以看到: // 内部类Node节点static final class Node{...}// 同步队列的head引用private transient volatile Node head;// 同步队列的tail引用private transient volatile Node tail; (2)节点构成那么Node结构的具体构成是什么呢?我们具体看内部类Node的源码: static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */static final int PROPAGATE = -3;/** 等待状态: * 0 INITAIL: 初始状态 * 1 CANCELLED: 由于等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态不会被改变 * -1 SIGNAL: 当前节点释放同步状态或被取消,则等待状态的后继节点被通知 * -2 CONDITION: 节点在等待队列中,线程在Condition上,需要其它线程调用Condition的signal()方法才能从等待队转移到同步队列 * -3 PROPAGATE: 表示下一个共享式同步状态将会无条件被传播下去 */volatile int waitStatus;/** 前驱结点 */volatile Node prev;/** 后继节点 */volatile Node next; /** 获取同步状态的线程 */volatile Thread thread;/** 等待队列中的后继节点 */Node nextWaiter; /** 判断Node是否是共享模式 */final boolean isShared() {return nextWaiter == SHARED; } /** 返回前驱结点 */final Node predecessor() throws NullPointerException { Node p = prev;if (p == null)throw new NullPointerException();elsereturn p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread; } } 从源码中可以发现:同步队列中的节点Node用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。 节点是构成同步队列的基础,没有成功获取同步状态的线程将成为节点加入该队列的尾部。当一个线程无法获取同步状态时,会被构造成节点并加入同步队列中,通过CAS保证设置尾节点这一步是线程安全的,此时才能认为当前节点(线程)成功加入同步队列与尾节点建立联系。具体的实现逻辑请看下面介绍! 2、同步状态获取与释放(1)独占式同步状态获取与释放通过调用同步器acquire(int arg)方法可以获取同步状态,该方法中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后序线程对进行中断操作时,线程不会从同步队列中移出 public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 同步状态获取主要的流程步骤: 1)首先调用自定义同步器实现tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态。 2)如果获取失败则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node ndoe)方法将该节点加入到同步队列的尾部,同时enq(node)通过for(;;)循环保证安全设置尾节点。 private Node addWaiter(Node mode) {// 根据给定模式构造NodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 尝试在尾部添加if (pred != null) { node.prev = pred;// cas方式保证正确添加尾节点if (compareAndSetTail(pred, node)) { pred.next = node;return node; } }// enq主要是通过for(;;)死循环来确保节点正确添加// 在for(;;)死循环中,通过cas将节点设置为尾节点时,才返回;否则一直尝试设置 enq(node);return node; } private Node enq(final Node node) {for (;;) { Node t = tail;if (t == null) { // Must initialize 当tail节点为null时,必须初始化构造好 head节点if (compareAndSetHead(new Node())) tail = head; } else { // 否则就通过cas开始添加尾节点node.prev = t;if (compareAndSetTail(t, node)) { t.next = node;return t; } } } } 假设原队列中存在Node-1到Node-4节点,此时某个线程获取同步状态失败则构成成Node-5通过CAS方式加入队列(下图忽略自旋环节)。 3)节点进入同步队列之后“自旋”,即acquireQueued(final Node node, int arg)方法,在这个方法中,当前node死循环尝试获取锁状态,但是只有node的前驱结点是Head才能尝试获取同步状态,获取成功之后立即设置当前节点为Head,并成功返回。否则就会一直自旋。 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 当前node节点的前驱是Head时(p == head),才能有资格去尝试获取同步状态(tryAcquire(arg))// 这是因为当前节点的前驱结点获得同步状态,才能唤醒后继节点,即当前节点if (p == head && tryAcquire(arg)) { // 以上条件满足之后setHead(node); // 设置当前节点为Headp.next = null; // help GC // 释放ndoe的前驱节点failed = false;return interrupted; }// 线程被中断或者前驱结点被释放,则继续进入检查:p == head && tryAcquire(argif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally {if (failed) cancelAcquire(node); } } 此时新加入的Node-5节点也开始自旋,此时的Head(Node-1)已经获取到了同步状态,而Node-2退出了自旋,成为了新的Head。 文字总结: 1)同步器会维护一个双向FIFO队列,获取同步失败的线程将会被构造成Node加入队尾(并且做自旋检查:检查前驱结点是否是Head); 2)当前线程想要获得同步状态,前提是其前驱结点是头结点,并且获得了同步状态; 3)当Head调用release(int arg)释放锁的同时会唤醒后继节点(即当前节点),后继节点结束自旋 流程图总结: 同步器的release方法:释放锁的同时,唤醒后继节点(进而时后继节点重新获取同步状态) public final boolean release(int arg) {if (tryRelease(arg)) { Node h = head;if (h != null && h.waitStatus != 0)// 该方法会唤醒Head节点的后继节点,使其重试尝试获取同步状态 unparkSuccessor(h);return true; }return false; } UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)唤醒处于等待状态的线程(之后会慢慢看源码介绍)。 (2)共享式同步状态获取与释放共享锁跟独占式锁最大的不同就是:某一时刻有多个线程同时获取到同步状态,获取判断是否获取同步状态成功的关键,获取到的同步状态要大于等于0。而其他步骤基本都是一致的,还是从源码开始分析起:带后缀Share都为共享式同步方法。 1)acquireShared(int arg)获取同步状态:如果获取失败则加入队尾,并且检查是否具备退出自旋的条件(前驱结点是头结点并且能成功获取同步状态) public final void acquireShared(int arg) {// tryAcquireShared 获取同步状态,大于0才是获取状态成功,否则就是失败if (tryAcquireShared(arg) < 0)// 获取状态失败则构造共享Node,加入队列;// 并且检查是否具备退出自旋的条件:即preNode为head,并且能获取到同步状态 doAcquireShared(arg); } 2)doAcquireShared(arg):获取失败的Node加入队列,如果当前节点的前驱结点是头结点的话,尝试获取同步状态,如果大于等于0则在for(;;)中退出(退出自旋)。 private void doAcquireShared(int arg) {// 构造共享模式的Nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);// 前驱节点是头结点,并且能获取状态成功,则return返回,退出死循环(自旋)if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GCif (interrupted) selfInterrupt(); failed = false;return; } }if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally {if (failed) cancelAcquire(node); } } 3)releaseShared(int arg):释放同步状态,通过loop+CAS方式释放多个线程的同步状态。 public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// 通过loop+CAS方式释放多个线程的同步状态 doReleaseShared();return true; }return false; } 二、自定义同步组件(实现Lock,内部类Sync继承AQS)1、实现一个不可重入的互斥锁Mutex 2、实现指定共享数量的共享锁MyShareLock |
|