Java之AQS框架底层原理分析
写在前面
1、内容都是原创(少量图片和官方文字有引用),是本人根据源码一行一行的研读,最后手写汇总给大家的
2、网上各种转发来转发去的AQS教程,真的看吐了,一堆错的知识点还在转发来转发去的,本人觉得这类知识还是不要看这类文章为好,自己跟着源码去理解才是正道
3、认真看完我写的每一句话,一定对你有帮助,可能会有少许错别字和疏忽点,请大家务必评论指正哈!
一、AQS框架简述
Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文会从应用层逐渐深入到原理层,并通过ReentrantLock的基本特性和ReentrantLock与AQS的关联,来深入解读AQS中独占锁的逻辑、Sync Queue(下面都称"同步队列")和Condition Queue(下面都称"等待队列"),不讲述包含共享锁和的部分。
二、临界资源的保护
1、概念
临界资源最重要的就是要维护好线程安全,防止出现冲突或者脏数据等等
2、学习
public static void main(String[] args) throws InterruptedException {
//1、一个非线程安全的Map
Map<Integer, String> map = new HashMap<>();
//2、多线程插入
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put(i, i+"");
}
}).start();
new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put(i, i+"");
}
}).start();
//3、阻塞-保证两个线程跑完
Thread.sleep(2000);
//4、输出结果
System.out.println(map.size());
} 这时候,我们有三种解决办法
1、将map变成ConcurrentHashMap,变成线程安全的Map,这样解决了插入冲突(可以了解下jdk7和8对ConcurrentHashMap、HashMap的优化)
2、利用同步代码关键字synchronized保证临界资源map的线程安全
public static void main(String[] args) throws InterruptedException {
//1、一个非线程安全的Map
Map<Integer, String> map = new HashMap<>();
Lock lock = new ReentrantLock();//创建可重入锁对象
//2、多线程插入
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
//加锁
lock.lock();
try {
map.put(i, i+"");
} finally {
//释放锁
lock.unlock();
}
}
}).start();
new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
//加锁
lock.lock();
try {
map.put(i, i+"");
} finally {
//释放锁
lock.unlock();
}
}
}).start();
//3、阻塞-保证两个线程跑完
Thread.sleep(2000);
//4、输出结果
System.out.println(map.size());
} 那么可重入锁ReentrantLock的lock方法下面到底是什么原理呢,可重入又是如何从代码层面去实现的呢,下面为大家一一阐述
三、AQS的几个重要状态量
1、等待状态
//1、下面两个对象分别代编代表共享锁和独占锁(排它锁),可以理解成枚举 static final Node SHARED = new Node();//共享锁 static final Node EXCLUSIVE = null;//排它锁 //2、四种等待状态 /** 表示线程获取锁的请求已经取消了 */ static final int CANCELLED = 1; /** 下一个节点(线程)需要被唤醒 */ static final int SIGNAL = -1; /** 表示节点在等待队列中,节点线程等待唤醒 */ static final int CONDITION = -2; /** 共享锁模式下才会用到 */ static final int PROPAGATE = -3; //3、常见变量 volatile int waitStatus;//等待状态 volatile Node prev;//同步队列-前置节点 volatile Node next;//后置节点 volatile Thread thread;//节点包含的线程-类似线程池的Worker持有线程 Node nextWaiter;//指向下一个处于CONDITION状态的节点/SHARED
2、state变量。
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
state>0意味着锁已经被获取了(次数取决于state大小),state=0意味着锁已经被释放了(临界资源可以尝试被获取,0也是Node节点初始化值),简要原理图如下:
3、AQS整体架构图
调用顺序自上而下(引用美团的技术文档,看看就好,这里不用深究,因为有不严谨的地方)
四、ReentrantLock
1、概念
ReentrantLock是可重入锁,可重入的概念是线程可以重复获取临界资源、对其加锁,我们结合对ReentrantLock的代码分析,深入理解AQS
2、lock()方法
1、Sync是ReentrantLock的内部类,并且有两个实现类,分别是公平锁和非公平锁
public void lock() {
sync.lock();
} NonfairSync.lock()
1、利用CAS算法更改锁的状态值,非公平锁上来就会来这里抢锁
2、成功,将独占锁的线程换成当前线程
final void lock() {
if (compareAndSetState(0, 1))//1
setExclusiveOwnerThread(Thread.currentThread());//2
else
acquire(1);
} final void lock() {
acquire(1);
} 非公平锁抢占失败/公平锁直接进入此方法,先说总体思路:
1、尝试获取锁
2、成功则结束acquire()方法结束
3、失败则将该节点加入同步队列,并且是排它锁模式==》Node.EXCLUSIVE
4、排队自旋获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//3、4
selfInterrupt();//5
}//2 NonfairSync.tryAcquire(int acquires)
1、判断当前锁是否被占用(state是否=0)
2、是,则CAS维护state
成功,则设置独占锁的持有线程为当前线程,结束获取锁流程;
失败,则尝试失败,进入上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
3、否,则判断是有锁的线程是否是当前线程
是,state值增加,代表获取锁成功
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//1-是
if (compareAndSetState(0, acquires)) {//2
setExclusiveOwnerThread(current);//2-成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//1-否
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//3-是
return true;
}
return false;//2-失败、3-否
} protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //1也体现了公平锁的特点-前面有人就去排队
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
} 1、包装一个节点:当前线程对象、锁模式(共享or独占)
2、如果同步队列尾节点不为空,则进行快速操作=》指针+CAS设置同步队列的尾节
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//1
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//2
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//3
return node;
} 自旋:只有当该节点的前驱节点是头节点时,才去尝试获取锁
1、当前节点在同步队列中的前驱节点是否是头节点
是,尝试获取锁tryAcquire(arg)
否,shouldParkAfterFailedAcquire=》判断是否阻塞该线程(阻塞条件:前驱节点的等待状态为SIGNAL)
是,则阻塞等唤醒/打断
否,返回是否被打断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//1
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//不是头节点的后驱节点&nbs***bsp;尝试获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);//2
}
} 1、判断前驱节点的等待状态是否是SIGNAL状态
是,则放心阻塞,返回true
否,判断前驱节点是否处于CANCELLED状态
是,则循环移除同步队列中处于取消状态的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0&nbs***bsp;PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
} 1、阻塞当前线程
2、若被唤醒或者当前线程被打断,执行下一步
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//1
return Thread.interrupted();//3
} private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
//&nbs***bsp;signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
} public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果被打断了,则把当前线程打断==》Thread.currentThread().interrupt();
selfInterrupt();
} public void unlock() {
//调用AQS的release方法,这个方法是框架提供的
sync.release(1);
} public final boolean release(int arg) {
if (tryRelease(arg)) {//调用实现类的方法,我们看看开发者如何自定义的
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒头节点的下一个节点==》LockSupport.unpark(h.next.thread);
return true;
}
return false;
} 干了两件事
1、将独占线程置空
2、维护state
同时我们发现,这个方法是写在Sync类里面的,没有区分公平/非公平锁,说明释放锁的逻辑是共用的,没有区别
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
} 1、如果当前节点未取消,则清除状态-CAS失败了也无所谓
2、寻找后置节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//1
Node s = node.next;//2
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//3
} AQS是一个抽象类,留了部分方法让我们自行去实现,主要是下面五个方法
| 方法名 | 描述 |
|---|---|
| protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
| protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
| protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
| protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
从lock到最后锁的释放,完整的调用流程是这样的:蓝色框由开发者自行实现,紫色框由AQS框架提供
五、Condition接口
1、属性
除了获取锁的同步队列,AQS的内部类ConditionObject实现了Condition接口,提供了一种等待队列(线程阻塞等待被唤醒的队列)
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
ReentrantLock lock = new ReentrantLock(); Condition conditionA = lock.newCondition(); Condition conditionB = lock.newCondition();
这两个方法是我们见到最多的等待/唤醒机制,并且是Object类中的方法
wait()
1、只有当前线程持有该对象的监视器的时候,才可以调用,也就是说要配合synchronized(方法或代码块)关键字使用,否则会抛出IllegalMonitorStateException异常
public final void wait() throws InterruptedException {
wait(0);
}
//使用示例
synchronized (obj) {
while (some conditions)
obj.wait();
...
} public final native void notify();
字面意思都是等待/唤醒,他们与Object的wait()/notify()有以下区别(此处欢迎补充)
| 区别点 | await()/signal() | wait()/notify() |
|---|---|---|
| 使用前提 | 结合Lock使用 | 结合synchronized使用 |
| 唤醒机制 | 指定唤醒 | 随机唤醒 |
| 谓词条件 | 可以多个 | 只有一个 |
4、await()方法
1、如果当前线程被打断了,抛出异常,catch并自定义后续操作
2、向该Condition对象的等待队列添加一个等待节点到队列尾部(单向队列),并返回这个节点
3、释放该节点(当前线程)持有的锁
4、循环判断当前节点是否在同步队列中(循环的意义是,防止虚假唤醒)
6、唤醒后,进行下一步:判断interruptMode(没有被打断、唤醒前打断的、需要唤醒后打断)
7、while循环结束,当前节点开始尝试获取锁:成功后并且interruptMode != THROW_IE(抛出异常模式),设置interruptMode为REINTERRUPT:重新打断
8、该节点有后置节点,说明当前Condition对象的等待队列不止一个等待节点,清理掉处于cancelled状态的节点
public final void await() throws InterruptedException {
if (Thread.interrupted())//1
throw new InterruptedException();
Node node = addConditionWaiter();//2
int savedState = fullyRelease(node);//3
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//4
LockSupport.park(this);//5
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//6
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//7
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();//8
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);//9
} 1、判断节点状态,如果是CONDITION或者前驱节点为空(同步队列上的节点的前驱节点不为空,因为至少有头节点作前驱),说明一定在等待队列(单向队列)上
2、判断节点状态,如果该节点的next不为空(next/prev指针是用在同步队列上的,等待队列上的指针是firstWaiter/lastWaiter/nextWaiter),说明一定在同步队列上
3、从后往前遍历同步队列查找是否存在该节点
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)//1
return false;
if (node.next != null) //2
return true;
return findNodeFromTail(node);//3
} checkInterruptWhileWaiting(Node node)
1、判断当前线程是否被打断
2、判断唤醒前打断还是要在唤醒后打断
3、返回三种状态值:
THROW_IE:唤醒之前就被打断了(已经被打断了)
REINTERRUPT:唤醒之后再被打断(还没被打断后面再打断)
0:没有被打断
备注:
1、阻塞状态下,打断线程会抛出异常;运行状态下打断,没有任何影响,只是更新了一下标志位=true,表示被打断,用于后置的自定义操作
2、调用Thread.interrupted()会返回标志位,并且重置标志位=false
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
} 1、CAS更改节点状态
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
} 1、判断当前线程是否持有独占锁(是,才有资格去唤醒其他线程,否则抛出异常)
2、拿到Condition对象等待队列的第一个节点(等了最久的一个节点==最早调用await()的节点)
3、确实有等待节点
4、执行唤醒操作:只唤醒一个节点
public final void signal() {
if (!isHeldExclusively())//1
throw new IllegalMonitorStateException();
Node first = firstWaiter;//2
if (first != null)//3
doSignal(first);//4
} 1、判断,如果当前等待队列只有这一个节点,则将lastWaiter置空
2、将该节点移出等待队列
3、转换节点状态(若状态转换失败并且还有后置等待节点,则尝试唤醒下一个节点,以此类推)
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
} 2、状态转换成功,将该节点加入同步队列并返回该节点的前置节点
/*
* Transfers a node from a condition queue onto sync queue.
*/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//1
return false;
Node p = enq(node);//2
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//3
LockSupport.unpark(node.thread);
return true;
} AQS的代码看的多了之后,会发现这两个东西经常出现,实际上这两者的关系实属难理解,接下来我来细细讲解他们的关系
1、阻塞
我们知道LockSupport.park()方法调用能够阻塞当前线程,而interrupt()方法能够打破线程的阻塞状态
If this thread is blocked in an invocation of the wait(), wait(long)&nbs***bsp;wait(long, int) methods of the Object}&nbs***bsp;of the join(), join(long), join(long, int), sleep(long),&nbs***bsp;sleep(long, int) methods of Thread class, then its interrupt status will be cleared and it will receive an InterruptedException.
2、异常
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
int count = 100;
while (count > 0) {
LockSupport.park();//阻塞在这里,等待唤醒
System.out.println(count--);
}
});
Thread t2 = new Thread(() -> {
try {
while (true) {
Thread.sleep(100);//保证t1线程先阻塞
t1.interrupt();//打断t1线程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
} 结果:1、不会抛出异常 2、100到1几乎无间隔输出了
3、分析
class Parker : public os::PlatformParker {
private:
//表示许可
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
//初始化_counter
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
void park(bool isAbsolute, jlong time);
void unpark();
// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
}; 然后调用t1.interrupt()打断线程之后,线程退出阻塞状态,并更新标志位=true(意味着被打断了),这个状态值保留到调用Thread.interrupted()为止(重置标志位=false)
void Parker::unpark() {
int s, status ;
//给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
//存储旧的_counter
s = _counter;
//许可改为1,每次调用都设置成发放许可
_counter = 1;
if (s < 1) {
//之前没有许可
if (WorkAroundNPTLTimedWaitHang) {
//默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
//释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
//一直有许可,释放掉自己加的锁,有许可park本身就返回了
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} public static void main(String[] args) {
Thread t1 = new Thread(() -> {
int count = 100;
while (count > 0) {
LockSupport.park();
System.out.println(count--);
Thread.interrupted();//清理标志位
}
});
Thread t2 = new Thread(() -> {
try {
while (true) {
Thread.sleep(100);
t1.interrupt();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
} 具体来说,当对一个线程,调用 interrupt() 时, ① 如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。 ② 如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
// do more work.
}
});
thread.start();
// 一段时间以后
thread.interrupt(); 1、AQS框架非常重要,无论是面试还是业务中,都会用到,无处不在
2、培养看源码的习惯,一遍看不懂就多看几遍,休息几天再来啃,看懂的那一天比通便还爽~~
3、有不足之处请指正👏,虚心指教
八、下期展望
各种锁的源码分析:读写锁、闭锁、信号量以及分布式锁下的锁特性
#Java开发##笔经##秋招##Java##校招##社招##Java工程师#