Java之AQS框架底层原理分析
写在前面
1、内容都是原创(少量图片和官方文字有引用),是本人根据源码一行一行的研读,最后手写汇总给大家的
2、网上各种转发来转发去的AQS教程,真的看吐了,一堆错的知识点还在转发来转发去的,本人觉得这类知识还是不要看这类文章为好,自己跟着源码去理解才是正道
3、认真看完我写的每一句话,一定对你有帮助,可能会有少许错别字和疏忽点,请大家务必评论指正哈!
一、AQS框架简述
Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文会从应用层逐渐深入到原理层,并通过ReentrantLock的基本特性和ReentrantLock与AQS的关联,来深入解读AQS中独占锁的逻辑、Sync Queue(下面都称"同步队列")和Condition Queue(下面都称"等待队列"),不讲述包含共享锁和的部分。
二、临界资源的保护
1、概念
临界资源最重要的就是要维护好线程安全,防止出现冲突或者脏数据等等
2、学习
public static void main(String[] args) throws InterruptedException { //1、一个非线程安全的Map Map<Integer, String> map = new HashMap<>(); //2、多线程插入 new Thread(() -> { for (int i = 0; i < 10000; i++) { map.put(i, i+""); } }).start(); new Thread(() -> { for (int i = 10000; i < 20000; i++) { map.put(i, i+""); } }).start(); //3、阻塞-保证两个线程跑完 Thread.sleep(2000); //4、输出结果 System.out.println(map.size()); }
这时候,我们有三种解决办法
1、将map变成ConcurrentHashMap,变成线程安全的Map,这样解决了插入冲突(可以了解下jdk7和8对ConcurrentHashMap、HashMap的优化)
2、利用同步代码关键字synchronized保证临界资源map的线程安全
public static void main(String[] args) throws InterruptedException { //1、一个非线程安全的Map Map<Integer, String> map = new HashMap<>(); Lock lock = new ReentrantLock();//创建可重入锁对象 //2、多线程插入 new Thread(() -> { for (int i = 0; i < 10000; i++) { //加锁 lock.lock(); try { map.put(i, i+""); } finally { //释放锁 lock.unlock(); } } }).start(); new Thread(() -> { for (int i = 10000; i < 20000; i++) { //加锁 lock.lock(); try { map.put(i, i+""); } finally { //释放锁 lock.unlock(); } } }).start(); //3、阻塞-保证两个线程跑完 Thread.sleep(2000); //4、输出结果 System.out.println(map.size()); }
那么可重入锁ReentrantLock的lock方法下面到底是什么原理呢,可重入又是如何从代码层面去实现的呢,下面为大家一一阐述
三、AQS的几个重要状态量
1、等待状态
//1、下面两个对象分别代编代表共享锁和独占锁(排它锁),可以理解成枚举 static final Node SHARED = new Node();//共享锁 static final Node EXCLUSIVE = null;//排它锁 //2、四种等待状态 /** 表示线程获取锁的请求已经取消了 */ static final int CANCELLED = 1; /** 下一个节点(线程)需要被唤醒 */ static final int SIGNAL = -1; /** 表示节点在等待队列中,节点线程等待唤醒 */ static final int CONDITION = -2; /** 共享锁模式下才会用到 */ static final int PROPAGATE = -3; //3、常见变量 volatile int waitStatus;//等待状态 volatile Node prev;//同步队列-前置节点 volatile Node next;//后置节点 volatile Thread thread;//节点包含的线程-类似线程池的Worker持有线程 Node nextWaiter;//指向下一个处于CONDITION状态的节点/SHARED
2、state变量。
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
state>0意味着锁已经被获取了(次数取决于state大小),state=0意味着锁已经被释放了(临界资源可以尝试被获取,0也是Node节点初始化值),简要原理图如下:
3、AQS整体架构图
调用顺序自上而下(引用美团的技术文档,看看就好,这里不用深究,因为有不严谨的地方)
四、ReentrantLock
1、概念
ReentrantLock是可重入锁,可重入的概念是线程可以重复获取临界资源、对其加锁,我们结合对ReentrantLock的代码分析,深入理解AQS
2、lock()方法
1、Sync是ReentrantLock的内部类,并且有两个实现类,分别是公平锁和非公平锁
public void lock() { sync.lock(); }
NonfairSync.lock()
1、利用CAS算法更改锁的状态值,非公平锁上来就会来这里抢锁
2、成功,将独占锁的线程换成当前线程
final void lock() { if (compareAndSetState(0, 1))//1 setExclusiveOwnerThread(Thread.currentThread());//2 else acquire(1); }
final void lock() { acquire(1); }
非公平锁抢占失败/公平锁直接进入此方法,先说总体思路:
1、尝试获取锁
2、成功则结束acquire()方法结束
3、失败则将该节点加入同步队列,并且是排它锁模式==》Node.EXCLUSIVE
4、排队自旋获取锁
public final void acquire(int arg) { if (!tryAcquire(arg) &&//1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//3、4 selfInterrupt();//5 }//2
NonfairSync.tryAcquire(int acquires)
1、判断当前锁是否被占用(state是否=0)
2、是,则CAS维护state
成功,则设置独占锁的持有线程为当前线程,结束获取锁流程;
失败,则尝试失败,进入上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
3、否,则判断是有锁的线程是否是当前线程
是,state值增加,代表获取锁成功
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//1-是 if (compareAndSetState(0, acquires)) {//2 setExclusiveOwnerThread(current);//2-成功 return true; } } else if (current == getExclusiveOwnerThread()) {//1-否 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//3-是 return true; } return false;//2-失败、3-否 }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //1也体现了公平锁的特点-前面有人就去排队 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
1、包装一个节点:当前线程对象、锁模式(共享or独占)
2、如果同步队列尾节点不为空,则进行快速操作=》指针+CAS设置同步队列的尾节
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//1 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//2 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//3 return node; }
自旋:只有当该节点的前驱节点是头节点时,才去尝试获取锁
1、当前节点在同步队列中的前驱节点是否是头节点
是,尝试获取锁tryAcquire(arg)
否,shouldParkAfterFailedAcquire=》判断是否阻塞该线程(阻塞条件:前驱节点的等待状态为SIGNAL)
是,则阻塞等唤醒/打断
否,返回是否被打断
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//1 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //不是头节点的后驱节点&nbs***bsp;尝试获取锁失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node);//2 } }
1、判断前驱节点的等待状态是否是SIGNAL状态
是,则放心阻塞,返回true
否,判断前驱节点是否处于CANCELLED状态
是,则循环移除同步队列中处于取消状态的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0&nbs***bsp;PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
1、阻塞当前线程
2、若被唤醒或者当前线程被打断,执行下一步
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//1 return Thread.interrupted();//3 }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel //&nbs***bsp;signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果被打断了,则把当前线程打断==》Thread.currentThread().interrupt(); selfInterrupt(); }
public void unlock() { //调用AQS的release方法,这个方法是框架提供的 sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) {//调用实现类的方法,我们看看开发者如何自定义的 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒头节点的下一个节点==》LockSupport.unpark(h.next.thread); return true; } return false; }
干了两件事
1、将独占线程置空
2、维护state
同时我们发现,这个方法是写在Sync类里面的,没有区分公平/非公平锁,说明释放锁的逻辑是共用的,没有区别
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
1、如果当前节点未取消,则清除状态-CAS失败了也无所谓
2、寻找后置节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//1 Node s = node.next;//2 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//3 }
AQS是一个抽象类,留了部分方法让我们自行去实现,主要是下面五个方法
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
从lock到最后锁的释放,完整的调用流程是这样的:蓝色框由开发者自行实现,紫色框由AQS框架提供
五、Condition接口
1、属性
除了获取锁的同步队列,AQS的内部类ConditionObject实现了Condition接口,提供了一种等待队列(线程阻塞等待被唤醒的队列)
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
ReentrantLock lock = new ReentrantLock(); Condition conditionA = lock.newCondition(); Condition conditionB = lock.newCondition();
这两个方法是我们见到最多的等待/唤醒机制,并且是Object类中的方法
wait()
1、只有当前线程持有该对象的监视器的时候,才可以调用,也就是说要配合synchronized(方法或代码块)关键字使用,否则会抛出IllegalMonitorStateException异常
public final void wait() throws InterruptedException { wait(0); } //使用示例 synchronized (obj) { while (some conditions) obj.wait(); ... }
public final native void notify();
字面意思都是等待/唤醒,他们与Object的wait()/notify()有以下区别(此处欢迎补充)
区别点 | await()/signal() | wait()/notify() |
---|---|---|
使用前提 | 结合Lock使用 | 结合synchronized使用 |
唤醒机制 | 指定唤醒 | 随机唤醒 |
谓词条件 | 可以多个 | 只有一个 |
4、await()方法
1、如果当前线程被打断了,抛出异常,catch并自定义后续操作
2、向该Condition对象的等待队列添加一个等待节点到队列尾部(单向队列),并返回这个节点
3、释放该节点(当前线程)持有的锁
4、循环判断当前节点是否在同步队列中(循环的意义是,防止虚假唤醒)
6、唤醒后,进行下一步:判断interruptMode(没有被打断、唤醒前打断的、需要唤醒后打断)
7、while循环结束,当前节点开始尝试获取锁:成功后并且interruptMode != THROW_IE(抛出异常模式),设置interruptMode为REINTERRUPT:重新打断
8、该节点有后置节点,说明当前Condition对象的等待队列不止一个等待节点,清理掉处于cancelled状态的节点
public final void await() throws InterruptedException { if (Thread.interrupted())//1 throw new InterruptedException(); Node node = addConditionWaiter();//2 int savedState = fullyRelease(node);//3 int interruptMode = 0; while (!isOnSyncQueue(node)) {//4 LockSupport.park(this);//5 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//6 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//7 interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters();//8 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);//9 }
1、判断节点状态,如果是CONDITION或者前驱节点为空(同步队列上的节点的前驱节点不为空,因为至少有头节点作前驱),说明一定在等待队列(单向队列)上
2、判断节点状态,如果该节点的next不为空(next/prev指针是用在同步队列上的,等待队列上的指针是firstWaiter/lastWaiter/nextWaiter),说明一定在同步队列上
3、从后往前遍历同步队列查找是否存在该节点
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null)//1 return false; if (node.next != null) //2 return true; return findNodeFromTail(node);//3 }
checkInterruptWhileWaiting(Node node)
1、判断当前线程是否被打断
2、判断唤醒前打断还是要在唤醒后打断
3、返回三种状态值:
THROW_IE:唤醒之前就被打断了(已经被打断了)
REINTERRUPT:唤醒之后再被打断(还没被打断后面再打断)
0:没有被打断
备注:
1、阻塞状态下,打断线程会抛出异常;运行状态下打断,没有任何影响,只是更新了一下标志位=true,表示被打断,用于后置的自定义操作
2、调用Thread.interrupted()会返回标志位,并且重置标志位=false
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
1、CAS更改节点状态
final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
1、判断当前线程是否持有独占锁(是,才有资格去唤醒其他线程,否则抛出异常)
2、拿到Condition对象等待队列的第一个节点(等了最久的一个节点==最早调用await()的节点)
3、确实有等待节点
4、执行唤醒操作:只唤醒一个节点
public final void signal() { if (!isHeldExclusively())//1 throw new IllegalMonitorStateException(); Node first = firstWaiter;//2 if (first != null)//3 doSignal(first);//4 }
1、判断,如果当前等待队列只有这一个节点,则将lastWaiter置空
2、将该节点移出等待队列
3、转换节点状态(若状态转换失败并且还有后置等待节点,则尝试唤醒下一个节点,以此类推)
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
2、状态转换成功,将该节点加入同步队列并返回该节点的前置节点
/* * Transfers a node from a condition queue onto sync queue. */ final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//1 return false; Node p = enq(node);//2 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//3 LockSupport.unpark(node.thread); return true; }
AQS的代码看的多了之后,会发现这两个东西经常出现,实际上这两者的关系实属难理解,接下来我来细细讲解他们的关系
1、阻塞
我们知道LockSupport.park()方法调用能够阻塞当前线程,而interrupt()方法能够打破线程的阻塞状态
If this thread is blocked in an invocation of the wait(), wait(long)&nbs***bsp;wait(long, int) methods of the Object}&nbs***bsp;of the join(), join(long), join(long, int), sleep(long),&nbs***bsp;sleep(long, int) methods of Thread class, then its interrupt status will be cleared and it will receive an InterruptedException.
2、异常
public static void main(String[] args) { Thread t1 = new Thread(() -> { int count = 100; while (count > 0) { LockSupport.park();//阻塞在这里,等待唤醒 System.out.println(count--); } }); Thread t2 = new Thread(() -> { try { while (true) { Thread.sleep(100);//保证t1线程先阻塞 t1.interrupt();//打断t1线程 } } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); }
结果:1、不会抛出异常 2、100到1几乎无间隔输出了
3、分析
class Parker : public os::PlatformParker { private: //表示许可 volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { //初始化_counter _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
然后调用t1.interrupt()打断线程之后,线程退出阻塞状态,并更新标志位=true(意味着被打断了),这个状态值保留到调用Thread.interrupted()为止(重置标志位=false)
void Parker::unpark() { int s, status ; //给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁 //park进入wait时,_mutex会被释放 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; //存储旧的_counter s = _counter; //许可改为1,每次调用都设置成发放许可 _counter = 1; if (s < 1) { //之前没有许可 if (WorkAroundNPTLTimedWaitHang) { //默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程 status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; //释放锁 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; } } else { //一直有许可,释放掉自己加的锁,有许可park本身就返回了 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
public static void main(String[] args) { Thread t1 = new Thread(() -> { int count = 100; while (count > 0) { LockSupport.park(); System.out.println(count--); Thread.interrupted();//清理标志位 } }); Thread t2 = new Thread(() -> { try { while (true) { Thread.sleep(100); t1.interrupt(); } } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); }
具体来说,当对一个线程,调用 interrupt() 时, ① 如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。 ② 如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。
Thread thread = new Thread(() -> { while (!Thread.interrupted()) { // do more work. } }); thread.start(); // 一段时间以后 thread.interrupt();
1、AQS框架非常重要,无论是面试还是业务中,都会用到,无处不在
2、培养看源码的习惯,一遍看不懂就多看几遍,休息几天再来啃,看懂的那一天比通便还爽~~
3、有不足之处请指正👏,虚心指教
八、下期展望
各种锁的源码分析:读写锁、闭锁、信号量以及分布式锁下的锁特性
#Java开发##笔经##秋招##Java##校招##社招##Java工程师#