浅析AQS(sync公平锁)
声明,本篇文章是我在对javadoop(https://www.javadoop.com ) 的理解的基础上写的
AQS 结构
先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了,毕竟可以猜嘛!
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的 private transient volatile Node head; // 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表 private transient volatile Node tail; // 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁 // 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1 private volatile int state; //AQS中没有 // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入 // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
怎么样,看样子应该是很简单的吧,毕竟也就四个属性啊。
AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列不包含 head,不包含 head,不包含 head。
等待队列中每个线程被包装成一个 Node 实例,数据结构是链表,一起看看源码吧:
static final class Node { // 标识节点当前在共享模式下 static final Node SHARED = new Node(); // 标识节点当前在独占模式下 static final Node EXCLUSIVE = null; //这两个只是标志 // ======== 下面的几个int常量是给waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代码此线程取消了争抢这个锁 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,所以略过吧,下一篇文章会介绍这个 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 同样的不分析,略过吧 static final int PROPAGATE = -3; // ===================================================== // 取值为上面的1、-1、-2、-3,或者0(以后会讲到) // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待, // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。 volatile int waitStatus; // 前驱节点的引用 volatile Node prev; // 后继节点的引用 volatile Node next; // 这个就是线程本尊 volatile Thread thread; }
Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。
上面的是基础知识,后面会多次用到,心里要时刻记着它们,心里想着这个结构图就可以了。下面,我们开始说 ReentrantLock 的公平锁。再次强调,我说的阻塞队列不包含 head 节点。
首先,我们先看下 ReentrantLock 的使用方式。
// 我用个web开发中的service概念吧 public class OrderService { // 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个 private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { // 比如我们同一时间,只允许一个线程创建订单 reentrantLock.lock(); // 通常,lock 之后紧跟着 try 语句 try { // 这块代码同一时间只能有一个线程进来(获取到锁的线程), // 其他的线程在lock()方法上阻塞,等待获取到锁,再进来 // 执行代码... // 执行代码... // 执行代码... } finally { // 释放锁 reentrantLock.unlock(); } } }
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
接下来我主要写一下AQS的流程,就不列代码了,具体代码可以看Java源码或者点击上面的文章
ReentrantLock 主要分为公平锁和非公平锁,本文现在主要描述下公平锁的流程
首先我们应该是先对一段代码加锁,所以要调用 ReentrantLock的lock函数,lock函数实现如下
public void lock() { sync.lock(); }
它会调用我们创建这个锁的时候创建的sync,也就是对应的公平锁和非公平锁
final void lock() { acquire(1); }
公平锁会获取锁(这里要讲一下,1表示的是state,state为0的时候表示没有线程持有当前锁,为1的时候表示有线程持有,当然大于1的情况也有,这种情况就是重入锁)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
调用acquire之后,就是尝试获取下锁,如果获取成功了当然也就结束了,不成功就把当前线程加入到等待队列中
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在tryAcquire中,会先获取当前的线程,然后获取state,如果state==0,也就是没有线程持有当前锁,并且没有线程在等待队列中(毕竟是公平锁,所以讲究先来后到),这个时候我们就尝试下CAS,如果成功了就皆大欢喜,把当前线程设置为独占线程,然后返回true
如果state不等于0的时候,我们就需要判断下是不是重入锁了,如果当前线程与独占线程相同的话,就把state+1,同样返回true。
如果两个情况都不是,就只能返回false然后把当前线程加入到等待队列中了。
这个时候,我们就需要执行后半段代码了
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先addWaiter,Node.EXCLUSIVE是说明当前是独占的,
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
我们进去之后,就会创建一个Node,然后找到等待队列的队尾,如果不为空就尝试下CAS操作,把这个节点设置到队尾然后返回(这是对应等待队列已经初始化的情况,还有一种情况,因为head和tail默认是null的),所以我们通常进入这个函数的时候,都会先执行enq(node);,在enq函数中,我们会初始化head和tail
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
类似自旋锁的形式吧,首先获取队尾,如果是空的话,那么不用想,head肯定也是空的,这个时候我们就尝试CAS(为什么CAS,同样是为了防止竞争,害怕head初始化两次),初始化head,然后让tail指向head,前面说过了,因为它是一直循环的,所以这个函数不会结束,还会再次执行。
这个时候,再次进入循环,tail也不为空了,那么我们就可以把这个节点(封装了当前线程和模式(独占))加入到等待队列中
enq(node);这个函数一般只会在tail和head没有初始化的时候或者在上面CAS失败的时候执行一次,通常都是CAS到tail中的
注意这个函数的返回值,返回的是node,也就是加入到等待队列中的那个节点,函数返回之后我们再看下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个时候,我们还是在acquire中的,太惊喜了,过了这么长时间,我们还在这个函数中
接下来,我们就会执行acquireQueued方法,这个方法执行略微复杂,所以需要仔细来
一定要注意acquireQueued这个时候如果要返回true,那么就会调用selfInterrupt();,所以通常情况会返回false,这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先还是自旋的调用,首先获取当前node的前置节点,如果前置节点是head那么我就就再次尝试获取下锁
这里我们说一下,为什么可以去试试: // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node, // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程 // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试, // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
如果成功,皆大欢喜,把当前节点设置为head,然后返回false就行
如果失败,那么就进入下一个if,这个时候,要么就是这个node的前置节点不是head,要么就是尝试获取锁又失败了,为什么说又,因为第一次lock的时候,acquie就调用过一次tryAcquire了,正是在这个前提下,才进入这个函数的
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
这个时候,会调用这两个函数,他俩的作用是什么呢?
首先shouldParkAfterFailedAcquire的意思大概是说:当前线程没有抢到锁,是否需要挂起当前线程?
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
首先获取前置节点的waitStatus,(这里说明一下,一般waitStatus都是默认值0的,因为初始化为0),什么时候是-1呢?就是在这个函数中设置的,只有它有后置节点的时候,他才有机会被设置为-1
进入这个方法,判断ws是否为-1,如果是就说明这不是第一次进入这个函数了(为什么第二次,这里我要说一下,因为他的默认值是0,所以只能是进入一次以上,因为他的上层调用函数是自旋的),接着说,-1的时候就直接返回true了,然后接着调用parkAndCheckInterrupt方法,这个方法的作用就是挂起当前线程,等待锁释放(unlock)的时候才会返回
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } public static boolean interrupted() { return currentThread().isInterrupted(true); } 这个方法会返回当前线程是否中断,并且清除终端状态
扯远了,再来看下接下来该执行哪里,再就是判断(ws>0)的情况,什么时候ws会大于0?之前说过,大于0 说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。 // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点, // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队, // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;
接下来就是重头戏了,我们会设置ws为-1
// 仔细想想,如果进入到这个分支意味着什么 // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0 // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0 // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
执行这个后,会返回false,然后上层的函数再次循环一遍,他会再次判断他的前置节点是不是head呢?是就尝试获取锁,失败就只能再走老路了,这个时候因为node的前置节点为-1了,所以会返回true,然后挂起当前线程
// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) // 这个方法结束根据返回值我们简单分析下: // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒 // 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了 // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看 // 跳回到前面是这个方法 // if (shouldParkAfterFailedAcquire(p, node) && // parkAndCheckInterrupt()) // interrupted = true; // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true, // 那么需要执行parkAndCheckInterrupt(): // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的 // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒======= private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况 // 仔细看shouldParkAfterFailedAcquire(p,node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。 // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程: // => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。都已经是head的后置节点了,我们为什么要挂起她??我们要勇于尝试啊,我们要再次尝试获取锁,只有再次失败了才挂起
接下来就是解锁了,其实解锁很简单的
解锁(unlock)
最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。
还是调用sync的方法,然后sync调用release(1),也就是-1(对应重入锁的情况,因为如果有重入锁,state的值可能比1大),首先呢,他也是tryRelease,如果成功才会去判断head是否需要唤醒后继节点
tryRelease首先会判断当前线程是否为独占线程(如果当前线程不是独占线程我们就抛出异常),还有一个变量free代表是否完全释放锁(重入锁的情况,这个函数的返回值也是free),只有在state==0的情况free才会为true,同时也会解除独占线程(设置为null),然后设置state,最后返回
// 唤醒的代码还是比较简单的,你如果上面加锁的都看懂了,下面都不需要看就知道怎么回事了 public void unlock() { sync.release(1); } public final boolean release(int arg) { // 往后看吧 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 回到ReentrantLock看tryRelease方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否完全释放锁 boolean free = false; // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒后继节点 // 从上面调用处知道,参数node是head头结点 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 如果head节点当前waitStatus<0, 将其修改为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
unparkSuccessor函数中,首先获取node的ws(node为头节点), 如果head节点当前waitStatus小于0, 将其修改为0,然后就开始唤醒后继节点,从尾部开始找,找到最前面waitStatus<=0的节点,找到了就唤醒线程
首先,第一行代码先检测 head的后继节点,只有当此时的后继节点不存在或者这个后继节点取消了才开始从后往前找,所以大部分情况下,其实不会发生从后往前遍历整个队列的情况。
后继节点取消很正常,但是某节点在入队的时候,如果发现前驱是取消状态,前驱节点是会被请出队列的
shouldParkAfterFailedAcquire方法,如果前驱节点ws>0就会被请出队列
如果从前往后找,你说的这个问题的答案在 addWaiter(Node mode)方法中,看下面的代码:
如果从前往后找的话 compareAndSetTail(pred, node) 如果执行完成而*pred.next = node; *还未来的及执行的话 新加的tail是无法被遍历到的。
Node pred = tail; if (pred != null) { node.prev = pred; // 1. 先设置的 tail if (compareAndSetTail(pred, node)) { // 2. 设置前驱节点的后继节点为当前节点 pred.next = node; return node; } } 所以,这里存在并发问题:从前往后寻找不一定能找到刚刚加入队列的后继节点。
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 刚刚线程被挂起在这里了 return Thread.interrupted(); } // 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了
然后就开始自旋
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
总结
总结一下吧。
在并发环境下,加锁和解锁需要以下三个部件的协调:
- 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
- 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程。
- 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用。AQS 采用了 CLH 锁的变体来实现,感兴趣的读者可以参考这篇文章关于CLH的介绍,写得简单明了。
在有锁竞争的时候head和tail才会初始化,如果没有锁竞争,那么AQS也就没有多大用处了