分享

AQS --- 融会贯通

 贪挽懒月 2022-06-20 发布于广东

一、ReentrantLock 加锁过程简介

加锁可以分为三个阶段:

  • 尝试加锁;
  • 加锁失败,线程入AQS队列;
  • 线程入队列后进入阻塞状态。

二、ReentrantLock 加锁源码分析

现有如下场景:

三个人去银行的一个窗口办理业务,一个窗口同一时刻只能接待一位顾客。抽象成代码就是:

public static void main(String[] args){
 ReentrantLock lock = new ReentrantLock();
 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顾客A办理业务");
   TimeUnit.MINUTES.sleep(5);
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "A").start();

 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顾客B办理业务");
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "B").start();

 new Thread(() -> {
  lock.lock();
  try {
   System.out.println("顾客C办理业务");
  } catch (Exception e){
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }, "C").start();
}

1. lock 方法:

调用lock.lock()的时候,实际上调用的是 NonfairSync 的 lock 方法,如下:

final void lock() {
 if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());
 else
  acquire(1);
}

compareAndSetState(0, 1) 这里比较并交换,期望 AQS 中的 state 的值是0,如果是,就将其修改成1,返回 true ,否则返回 false 。

  • 返回 true 的时候,执行 setExclusiveOwnerThread(Thread.currentThread()) 把占用资源的线程设置成当前线程。当线程A进来的时候,因为 AQS 中的 state 还没别的线程去修改,所以是0,就会成功修改成 1,就直接加锁成功了。

  • 返回 false 的时候,就会进入 esle 块中,执行 acquire(1) 方法。假如线程A加锁成功还没释放锁的时候,线程B进来了,那么就会返回 false 。 acquire(1) 方法又主要包括三个方法,源码如下:

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  selfInterrupt();
}

接下来依次来看这三个方法。

2. tryAcquire(arg) 方法:

protected final boolean tryAcquire(int acquires) {
 return nonfairTryAcquire(acquires);
}

再点进去看 nonfairTryAcquire(acquires) :

final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
  if (compareAndSetState(0, acquires)) {
   setExclusiveOwnerThread(current);
   return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {
  int nextc = c + acquires;
  if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
  setState(nextc);
  return true;
 }
 return false;
}

线程B进到这段代码就有三种情况:

  • currentThread 就是线程B,c 就是 AQS 中的 state,即1,c 不等于0,所以跑到 else if 中,判断当前线程和持有锁的线程是否相同。current 是 B,而当前持有锁的是 A,也不相等,所以直接 return false。

  • 如果线程B进来的时候 A 刚好走了,即 c 等于0,那么进到 if 里面。if 里面做的事就是再将 state 改成1,同时设置当前占有锁的线程为 B,然后返回 true;

  • 如果当前线程等于当前占有锁的线程,即进来的还是线程A,那么就修改 state 的值(当前值加1),然后返回 true。

3. addWaiter(Node.EXCLUSIVE) 方法:

上面说了 tryAcquire(arg) 方法,当该方法返回 false 的时候,就会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,那么先看它里面的 addWaiter(Node.EXCLUSIVE) 方法,源码如下:

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
  node.prev = pred;
  if (compareAndSetTail(pred, node)) {
   pred.next = node;
   return node;
  }
 }
 enq(node);
 return node;
}

无特殊说明的时候,以下情况基于:当前持有锁的是线程A,并且还没释放,进来的是线程B。

这个方法首先将线程B封装成 Node,传进来的 mode 是 AQS 中的一个属性,还没哪里赋过值,所以是 null,当前的 tail 其实也是 null,因为 AQS 队列中现在还没别的等待线程。是 null 的话,就执行入队方法 enq(node),该方法如下:

private Node enq(final Node node) {
 for (;;) {
  Node t = tail;
  if (t == null) { // Must initialize
   if (compareAndSetHead(new Node()))
    tail = head;
  } else {
   node.prev = t;
   if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
   }
  }
 }
}

for (; ;) 其实就是相当于 while(true),进行自旋。当前的 tail 是 null ,所以进入 if 中,这里有个 compareAndSetHead(new Node()) 方法,这里是 new 了一个节点,姑且叫它傀儡节点,将它设置为头结点,如果 new 成功了,尾结点也指向它。效果如下图:

傀儡节点

第二次循环的时候,t 就是不再是 null 了,而是指向刚才那个傀儡节点了,所以进入 else 中。else 中做的事就是,将传进来的节点,即封装了线程B的节点 node,将其 prev 设置成刚才new 的那个傀儡节点,再将 tail 指向 封装了线程B的 node;再将傀儡节点的 next 指针指向封装了线程B的 node,如下图:

node入队

当线程C进到 addWaiter(Node mode) 方法,此时 tail 不是 null 了,已经指向了 线程B的节点,所以进入到 if 块里面,执行:

if (pred != null) {
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
  pred.next = node;
  return node;
 }
}

这里就是将线程C的 prev 设置为当前的 tail,即线程B的 node,然后将线程C设置为 tail,再将线程B的 next 设置为线程C。最后效果图如下:

线程C入队

4. acquireQueued(final Node node, int arg) 方法:

再来看看这个方法:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
  boolean interrupted = false;
  for (;;) {
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

这个固定的,假如传进来的 node 是线程B,首先会进入自旋,看看 predecessor 这方法:

final Node predecessor() throws NullPointerException {
 Node p = prev;
 if (p == null)
  throw new NullPointerException();
 else
  return p;
}

首先让 p 等于 prev,此时线程B 节点的 prev是谁呢,直接看我上面画的图可以知道,线程B的 prev 就是傀儡节点,所以这里 return 的就是傀儡节点。

回到外层,傀儡节点等于 head,所以又会执行 tryAcquire(arg),即又重复上述步骤再次尝试获取锁。因为此时线程A持有锁,所以线程B尝试获取锁会失败,即 tryAcquire(arg) 会返回 false。

返回 false, 那就执行下一个 if。首先看 shouldParkAfterFailedAcquire(p, node) 方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)
  /*
   * This node has already set status asking a release
   * to signal it, so it can safely park.
   */
  return true;
 if (ws > 0) {
  /*
   * Predecessor was cancelled. Skip over predecessors and
   * indicate retry.
   */
  do {
   node.prev = pred = pred.prev;
  } while (pred.waitStatus > 0);
  pred.next = node;
 } else {
  /*
   * waitStatus must be 0 or PROPAGATE.  Indicate that we
   * need a signal, but don't park yet.  Caller will need to
   * retry to make sure it cannot acquire before parking.
   */
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}

prev 是傀儡节点,它的 waitStatus 是0,因为傀儡节点 new 出来以后还没改过它的 waitStatus 的值,默认是0。Node.SIGNAL 的值是 -1,不相等,0 也不大于 0,所以进入 else,执行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL),这是比较并交换,将傀儡节点的 waitStatus 的值由 0 改为 -1,最后返回了 false。

shouldParkAfterFailedAcquire(p, node) 方法返回了 false,因为自旋,所以又回到 final Node p = node.predecessor(); 这一行。此时 p 节点还是傀儡节点,再去尝试获取锁,如果线程A还是释放,又获取失败了,就会再次执行 shouldParkAfterFailedAcquire(p, node) 方法。

再次执行 shouldParkAfterFailedAcquire(p, node) 的时候,傀儡节点的 waitStatus 就已经是 -1 了,所以直接 return true。

这里 return 了 true,就会执行 parkAndCheckInterrupt() 方法,看看这个方法源码:

private final boolean parkAndCheckInterrupt() {
 LockSupport.park(this);
 return Thread.interrupted();
}

这里的 this 就是线程B,这里调用了 park 方法,就让线程B 在等着了。线程C进来也一样,执行到这一步,就会调用 park 方法,一直在等着。当线程A释放锁了,就会调用 unpark 方法,线程B和线程C就有一个可以抢到锁了。

5. unlock 方法:

当线程A调用了 unlock 方法,实际上调用的是:

 public void unlock() {
 sync.release(1);
}

点进去之后是这样的:

public final boolean release(int arg) {
 if (tryRelease(arg)) {
  Node h = head;
  if (h != null && h.waitStatus != 0)
   unparkSuccessor(h);
  return true;
 }
 return false;
}

再点进去看 tryRelease(arg 方法:

 protected final boolean tryRelease(int releases) {
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
  throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
  free = true;
  setExclusiveOwnerThread(null);
 }
 setState(c);
 return free;
}

线程A getState 是 1,传进来的 releases 也是 1,所以相减结果就是 0。因为结果是 0,所以会将 free 改成 true,然后调用 setExclusiveOwnerThread(null); 将当前持有锁的线程设置为 null。然后设置 AQS 的 state 等于 c,即等于 0,最后该方法 return true。

回到上一层,此时的 head 是傀儡节点,不为空,并且傀儡节点的 waitStatus 刚才改成了 -1,不等于 0,所以会调用 unparkSuccessor(h); 方法:

private void unparkSuccessor(Node node) {
 /*
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling.  It is OK if this
  * fails or if status is changed by waiting thread.
  */
 int ws = node.waitStatus;
 if (ws < 0)
  compareAndSetWaitStatus(node, ws, 0);

 /*
  * Thread to unpark is held in successor, which is normally
  * just the next node.  But if cancelled or apparently null,
  * traverse backwards from tail to find the actual
  * non-cancelled successor.
  */
 Node s = node.next;
 if (s == null || s.waitStatus > 0) {
  s = null;
  for (Node t = tail; t != null && t != node; t = t.prev)
   if (t.waitStatus <= 0)
    s = t;
 }
 if (s != null)
  LockSupport.unpark(s.thread);
}

这里传进来的节点是傀儡节点,它的 waitStatus 是 -1,小于 0,所以就会执行 compareAndSetWaitStatus(node, ws, 0) 进行比较并交换,又将傀儡节点的 waitStatus 改成了 0。

继续往下执行,得到的 s 就是线程B所在节点,不为空,并且线程B节点的 waitStatus 还没改过,是 0,所以直接执行 LockSupport.unpark(s.thread)

因为调用了 unpark,所以刚才阻塞的线程B在 acquireQueued(final Node node, int arg) 方法中的自旋就继续进行,就会调用 tryAcquire(arg) 方法,这次因为A已经释放锁了,所以该方法会返回 true,就会执行 if 里面的代码:

if (p == head && tryAcquire(arg)) {
 setHead(node);
 p.next = null; // help GC
 failed = false;
 return interrupted;
}

看看 setHead(node) 方法:

private void setHead(Node node) {
 head = node;
 node.thread = null;
 node.prev = null;
}

这里的 node 就是线程B节点,将头结点指向线程B节点,将线程B节点的线程设置为空,前驱设置为空。外层再把傀儡节点的 next 指针设置为空,所以最终效果就是:

傀儡节点出队

最终是傀儡节点出队,以前线程B所在节点成为新的傀儡节点。因为之前的傀儡节点已经没有任何引用指向它了,就会被 GC 回收。


扫描二维码

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多