AQS源码解析 + 面试题
AQS源码解析 + 面试题
题目都基于面试题。搞定源码才能应对各种各样的面试题。因为万变不离其宗。
什么是AQS?
AQS是AbstractQueuedSynchronizer的简称。直译就是抽象队列同步器。
它定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如ReentrantLock,ReentrantReadWriteLock,Semaphone,CountDownLatch,CyclicBarrier等。

是Sync继承了AQS,而Sync是上面同步类的静态类。
AQS定义的资源共享方式有几种?分别是什么?
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁。
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁。
- 非公平锁:一来就抢锁,无视队列顺序,抢不到才乖乖排队。
- Share(共享):多个线程可同时执行,如Semaphore、CountDwonLatch、CyclicBarrier、ReadWriteLock的Read锁。
AQS的实现原理是什么?



简单介绍下,AQS里有一个节点Node类,通过Node类可以实现两个队列。一个是通过prev和next实现CLH队列,二是nextWaiter实现Condition条件上的等待队列。
AQS是基于模板方法模式的,所以会有一些方法需要子类去实现。因此,我们通过ReentrantLock这个类的源码分析就能搞懂AQS的实现原理。
主要看三大块逻辑的代码实现:
- ReentrantLock实例构造
- 加锁 lock() 过程
- 解锁 unlock() 过程
ReentrantLock实例构造

多线程抢锁概览(非公平锁)

加锁 lock() 源码流程

先来看公平锁和非公平锁里lock()的实现:


/**
* The current owner of exclusive mode synchronization.
*/
//当前持有锁的线程
private transient Thread exclusiveOwnerThread; 里面有个acquire方法,非公平锁和公平锁加锁过程都会调用到。
来看下实现:

首先看 tryAcquire(arg)方法
- 公平锁

- 非公平锁

对应的逻辑脑图:

接着看addWaiter()方法:

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* Creates and enqueues node for current thread and given mode.
* 为代表当前线程并指定模式为mode的节点创建/进入队列
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 指定的模式
* @return the new node
*
*/
private Node addWaiter(Node mode) {
//为当前线程创建一个节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果目前有线程在队列中等待
if (pred != null) {
node.prev = pred; /**[原尾结点pred] <-- [新节点node].prev**/
//CAS更新尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node; /**[原尾结点pred].next --> [新节点node]**/
return node; /**上面相当于做了双向链表的节点关联**/
}
}
// 假设一种情况:线程A在执行,当前线程B就会抢不到锁,接着来到这个方法 tail = null,不会进入if
//返回值为首节点承载null的node
enq(node);
return node;
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
//this: 当前AQS实例对象(工作内存),尾部的相对偏移量,期望值(主内存),要更新的值
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
} enq(node):

对应的逻辑脑图:
再来看acquireQueued(final Node node, int arg)方法:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功则将当前节点设置为head节点,然后空置之前的head节点,方便被垃圾回收掉。
*如果加锁失败或前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法,将head节点的waitStatus变成了SIGNAL = -1 (即后继结点需要被唤醒),最后执行parkAndCheckInterrupt方法,调用LockSupport.park()挂起当前线程。
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//假设线程B是node:会一直循环,直到拿到锁
for (;;) {
final Node p = node.predecessor();
//如果node的前驱节点p是head,表示node是第二个节点,就可以尝试去获取锁了
if (p == head && tryAcquire(arg)) {
//获取锁后,将head指向该节点,为了推进队列前移
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire方法,
// 如果线程B的前置节点waitStatus = Node.SIGNAL 表示需要挂起线程B
//parkAndCheckInterrupt方法,挂起线程B,判断线程是否执行过interrupt,并且清除掉interrupt的值
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 当获取锁失败后,判断是否要挂起该线程
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; /** 例如线程B的前节点是空内容,那ws = 0,等主程序第二次循环时,才会进入下面第一个判断 **/
if (ws == Node.SIGNAL) /** 当前节点处于 “当前节点的后面节点需要被唤醒” 的状态 **/
/*
* 返回 可以挂起该线程 的信号
*/
return true;
if (ws > 0) { /** 前节点处于取消状态 **/
do {
node.prev = pred = pred.prev; /** 将所有处于取消状态去除掉 **/
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); /** 当ws = 0 或 -3时,将它置为 -1 **/
}
return false;
}
//一旦调用该方法,当前线程就会被挂起,也就是停在这个方法里
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); /** 阻塞该线程,至此该线程进入等待状态,等着unpark和interrupt叫醒他 **/
return Thread.interrupted(); /** 叫醒之后返回该线程是否在中断状态, 并会清除中断记号 **/
}
逻辑脑图:

线程A, B, C的加锁流程



解锁 unlock() 源码流程



private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取头节点的等待状态
int ws = node.waitStatus;
if (ws < 0)
//将头节点的等待状态置为0,表示后面无线程等待
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获得头节点的后续节点
Node s = node.next;
//若s为空 或者 等待状态为取消
if (s == null || s.waitStatus > 0) {
//从尾结点开始向头节点遍历,遍历到整个队列最前面的等待状态 <= 0,
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
//赋值给s,用于后续的唤醒操作
s = t;
}
if (s != null)
//唤醒该节点,促使再次执行抢锁操作
//这时候这个节点会在加锁的第三部分逻辑acquirequeue那里循环里尝试抢锁。
LockSupport.unpark(s.thread);
} 

线程A, B, C解锁过程



依赖AQS的一些通信工具类
Semaphore
作用是限制线程的数量。
最主要的⽅法是acquire⽅法和release⽅法。acquire()⽅***申请⼀个permit,⽽release⽅***释放⼀个permit。当然,你也可以申请多个acquire(int permits)或者 释放多个release(int permits)。
每次acquire,permits就会减少⼀个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为⽌。
举个栗子:
我想限制同时只有3个线程在工作:
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量
Semaphore semaphore = new Semaphore(3);
for(int i = 1; i <= 6; i++){
new Thread(()->{
try {
semaphore.acquire();
System.out.println("当前线程"+Thread.currentThread().getName()+", 还剩"
+semaphore.availablePermits()+"个资源,"+"还有"+semaphore.getQueueLength()
+"个线程在等待");
Random random = new Random();
TimeUnit.SECONDS.sleep(5);
System.out.println("线程"+Thread.currentThread().getName()+"释放了资源");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
} 结果:

可以看出当线程6正在执行时,资源还剩0个,其他线程默认进入等待队列
当然,Semaphore默认的acquire⽅法是会让线程进⼊等待队列,且会抛出中断异 常。但它还有⼀些⽅法可以忽略中断或不进⼊阻塞队列
// 忽略中断 public void acquireUninterruptibly() public void acquireUninterruptibly(int permits) // 不进⼊等待队列,底层使⽤CAS public boolean tryAcquire public boolean tryAcquire(int permits) public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException public boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore内部有⼀个继承了AQS的同步器Sync,重写了 tryAcquireShared ⽅ 法。在这个⽅法⾥,会去尝试获取资源。
如果获取失败(想要的资源数量⼩于⽬前已有的资源数量),就会返回⼀个负数 (代表尝试获取资源失败)。然后当前线程就会进⼊AQS的等待队列。
CountDownLatch
计数递减屏障。
假设某个线程在执行任务之前,需要等待其他线程完成前置任务,必须等待所有的前置任务都完成,才能开始执行本线程的任务。
有这些方法:
// 构造⽅法: public CountDownLatch(int count) public void await() // 等待 public boolean await(long timeout, TimeUnit unit) // 超时等待 public void countDown() // count - 1 public long getCount() // 获取当前还有多少count
举个栗子:,玩游戏的时候,在游戏真正开始之前,⼀般会等待⼀些前置任务完成, ⽐如“加载地图数据”,“加载⼈物模型”,“加载背景⾳乐”等等。只有当所有的东⻄都 加载完成后,玩家才能真正进⼊游戏。下⾯我们就来模拟⼀下这个demo。
public class CountDownDemo {
static class PreTaskThread implements Runnable{
private String task;
private CountDownLatch countDownLatch;
public PreTaskThread(String task, CountDownLatch countDownLatch){
this.task = task;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
Random random = new Random();
try {
Thread.sleep(random.nextInt(1000));
System.out.println(task + "- 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(()->{
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务",countDownLatch.getCount()));
countDownLatch.await();
System.out.println("加载数据完成,开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//前置任务
new Thread(new PreTaskThread("加载地图数据",countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型",countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐",countDownLatch)).start();
}
} 结果:

CountDownLatch原理,内部同样依赖了一个AQS的实现类Sync,
构造器里的计数器count实际上是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
下面源码分析一波:
//构造器 构建一个state为count数量的AQS同步队列
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
//等待 会调用acquireSharedInterruptibly
public void await() throws InterruptedException {
//允许抛出中断异常的请求共享锁
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//若当前锁不空闲,也就是计数count不为0
if (tryAcquireShared(arg) < 0)
//挂起该线程
doAcquireSharedInterruptibly(arg);
}
//当state为0(当前锁空闲),返回1;否则返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//这里的实现跟Reentrenlock的acquireQueue差不多
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加进阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { //线程被唤醒后,会在这里醒过来,去抢锁。
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//计数递减 释放锁
public void countDown() {
sync.releaseShared(1);
}
//
public final boolean releaseShared(int arg) {
//当递减到count为0时,返回true
if (tryReleaseShared(arg)) {
//唤醒所有等待线程
doReleaseShared();
return true;
}
return false;
}
//递减锁的重入次数
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
//唤醒等待线程操作
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //节点是否在等待唤醒状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //将头结点修改为无线程在等待
continue; // loop to recheck cases
unparkSuccessor(h); //成功则唤醒所有线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
} CyclicBarrier
从名字上理解是“循环的屏障”。CountDownLatch一旦计数值count被降为0后,就不能重新设置了。而CyclicBarrier可以使用reset()重置。
如果在参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以⽤ isBroken() ⽅法检测Barrier是否被破坏。
如果有线程已经处于等待状态,调⽤reset⽅***导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会 导致始终⽆法等待。
如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常, 并且这个异常会传播到其他所有的线程。
如果在执⾏屏障操作过程中发⽣异常,则该异常将传播到当前线程中,其他线 程会抛出BrokenBarrierException,屏障被损坏。
如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线 程会抛出BrokenBarrierException异常。
举个栗子:
我们同样⽤玩游戏的例⼦。如果玩⼀个游戏有多个“关卡”,那使⽤CountDownLatch显然不太合适,那需要为每个关卡都创建⼀个实例。那我们可以 使⽤CyclicBarrier来实现每个关卡的数据加载等待功能。
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {
private String task;
private CyclicBarrier cyclicBarrier;
public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, tas
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
cyclicBarrier.reset(); // 重置屏障
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});
new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载⼈物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景⾳乐", cyclicBarrier)).start();
}
} 输出:
关卡1的任务加载地图数据完成
关卡1的任务加载背景⾳乐完成
关卡1的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡2的任务加载地图数据完成
关卡2的任务加载背景⾳乐完成
关卡2的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡3的任务加载⼈物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景⾳乐完成
本关卡所有前置任务完成,开始游戏...
CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使⽤的是Lock + Condition实现的等待/通知模式。

查看9道真题和解析