AQS源码解析 + 面试题
AQS源码解析 + 面试题
题目都基于面试题。搞定源码才能应对各种各样的面试题。因为万变不离其宗。
什么是AQS?
AQS是AbstractQueuedSynchronizer的简称。直译就是抽象队列同步器。
它定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如ReentrantLock,ReentrantReadWriteLock,Semaphone,CountDownLatch,CyclicBarrier等。
是Sync
继承了AQS
,而Sync
是上面同步类的静态类。
AQS定义的资源共享方式有几种?分别是什么?
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁。
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁。
- 非公平锁:一来就抢锁,无视队列顺序,抢不到才乖乖排队。
- Share(共享):多个线程可同时执行,如Semaphore、CountDwonLatch、CyclicBarrier、ReadWriteLock的Read锁。
AQS的实现原理是什么?
简单介绍下,AQS里有一个节点Node类,通过Node类可以实现两个队列。一个是通过prev和next实现CLH队列,二是nextWaiter实现Condition条件上的等待队列。
AQS是基于模板方法模式的,所以会有一些方法需要子类去实现。因此,我们通过ReentrantLock这个类的源码分析就能搞懂AQS的实现原理。
主要看三大块逻辑的代码实现:
- ReentrantLock实例构造
- 加锁 lock() 过程
- 解锁 unlock() 过程
ReentrantLock实例构造
多线程抢锁概览(非公平锁)
加锁 lock() 源码流程
先来看公平锁和非公平锁里lock()的实现:
/** * The current owner of exclusive mode synchronization. */ //当前持有锁的线程 private transient Thread exclusiveOwnerThread;
里面有个acquire方法,非公平锁和公平锁加锁过程都会调用到。
来看下实现:
首先看 tryAcquire(arg)方法
- 公平锁
- 非公平锁
对应的逻辑脑图:
接着看addWaiter()方法:
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * Creates and enqueues node for current thread and given mode. * 为代表当前线程并指定模式为mode的节点创建/进入队列 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 指定的模式 * @return the new node * */ private Node addWaiter(Node mode) { //为当前线程创建一个节点 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //如果目前有线程在队列中等待 if (pred != null) { node.prev = pred; /**[原尾结点pred] <-- [新节点node].prev**/ //CAS更新尾结点 if (compareAndSetTail(pred, node)) { pred.next = node; /**[原尾结点pred].next --> [新节点node]**/ return node; /**上面相当于做了双向链表的节点关联**/ } } // 假设一种情况:线程A在执行,当前线程B就会抢不到锁,接着来到这个方法 tail = null,不会进入if //返回值为首节点承载null的node enq(node); return node; } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { //this: 当前AQS实例对象(工作内存),尾部的相对偏移量,期望值(主内存),要更新的值 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
enq(node):
对应的逻辑脑图:
再来看acquireQueued(final Node node, int arg)方法:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * 会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功则将当前节点设置为head节点,然后空置之前的head节点,方便被垃圾回收掉。 *如果加锁失败或前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法,将head节点的waitStatus变成了SIGNAL = -1 (即后继结点需要被唤醒),最后执行parkAndCheckInterrupt方法,调用LockSupport.park()挂起当前线程。 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //假设线程B是node:会一直循环,直到拿到锁 for (;;) { final Node p = node.predecessor(); //如果node的前驱节点p是head,表示node是第二个节点,就可以尝试去获取锁了 if (p == head && tryAcquire(arg)) { //获取锁后,将head指向该节点,为了推进队列前移 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //shouldParkAfterFailedAcquire方法, // 如果线程B的前置节点waitStatus = Node.SIGNAL 表示需要挂起线程B //parkAndCheckInterrupt方法,挂起线程B,判断线程是否执行过interrupt,并且清除掉interrupt的值 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
/** * 当获取锁失败后,判断是否要挂起该线程 * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /** 例如线程B的前节点是空内容,那ws = 0,等主程序第二次循环时,才会进入下面第一个判断 **/ if (ws == Node.SIGNAL) /** 当前节点处于 “当前节点的后面节点需要被唤醒” 的状态 **/ /* * 返回 可以挂起该线程 的信号 */ return true; if (ws > 0) { /** 前节点处于取消状态 **/ do { node.prev = pred = pred.prev; /** 将所有处于取消状态去除掉 **/ } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); /** 当ws = 0 或 -3时,将它置为 -1 **/ } return false; } //一旦调用该方法,当前线程就会被挂起,也就是停在这个方法里 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); /** 阻塞该线程,至此该线程进入等待状态,等着unpark和interrupt叫醒他 **/ return Thread.interrupted(); /** 叫醒之后返回该线程是否在中断状态, 并会清除中断记号 **/ }
逻辑脑图:
线程A, B, C的加锁流程
解锁 unlock() 源码流程
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获取头节点的等待状态 int ws = node.waitStatus; if (ws < 0) //将头节点的等待状态置为0,表示后面无线程等待 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //获得头节点的后续节点 Node s = node.next; //若s为空 或者 等待状态为取消 if (s == null || s.waitStatus > 0) { //从尾结点开始向头节点遍历,遍历到整个队列最前面的等待状态 <= 0, s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //赋值给s,用于后续的唤醒操作 s = t; } if (s != null) //唤醒该节点,促使再次执行抢锁操作 //这时候这个节点会在加锁的第三部分逻辑acquirequeue那里循环里尝试抢锁。 LockSupport.unpark(s.thread); }
线程A, B, C解锁过程
依赖AQS的一些通信工具类
Semaphore
作用是限制线程的数量。
最主要的⽅法是acquire
⽅法和release
⽅法。acquire()⽅***申请⼀个permit,⽽release⽅***释放⼀个permit。当然,你也可以申请多个acquire(int permits)或者 释放多个release(int permits)。
每次acquire,permits就会减少⼀个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为⽌。
举个栗子:
我想限制同时只有3个线程在工作:
public class SemaphoreDemo { public static void main(String[] args) { //线程数量 Semaphore semaphore = new Semaphore(3); for(int i = 1; i <= 6; i++){ new Thread(()->{ try { semaphore.acquire(); System.out.println("当前线程"+Thread.currentThread().getName()+", 还剩" +semaphore.availablePermits()+"个资源,"+"还有"+semaphore.getQueueLength() +"个线程在等待"); Random random = new Random(); TimeUnit.SECONDS.sleep(5); System.out.println("线程"+Thread.currentThread().getName()+"释放了资源"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
结果:
可以看出当线程6正在执行时,资源还剩0个,其他线程默认进入等待队列
当然,Semaphore默认的acquire⽅法是会让线程进⼊等待队列,且会抛出中断异 常。但它还有⼀些⽅法可以忽略中断或不进⼊阻塞队列
// 忽略中断 public void acquireUninterruptibly() public void acquireUninterruptibly(int permits) // 不进⼊等待队列,底层使⽤CAS public boolean tryAcquire public boolean tryAcquire(int permits) public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException public boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore内部有⼀个继承了AQS的同步器Sync,重写了 tryAcquireShared ⽅ 法。在这个⽅法⾥,会去尝试获取资源。
如果获取失败(想要的资源数量⼩于⽬前已有的资源数量),就会返回⼀个负数 (代表尝试获取资源失败)。然后当前线程就会进⼊AQS的等待队列。
CountDownLatch
计数递减屏障。
假设某个线程在执行任务之前,需要等待其他线程完成前置任务,必须等待所有的前置任务都完成,才能开始执行本线程的任务。
有这些方法:
// 构造⽅法: public CountDownLatch(int count) public void await() // 等待 public boolean await(long timeout, TimeUnit unit) // 超时等待 public void countDown() // count - 1 public long getCount() // 获取当前还有多少count
举个栗子:,玩游戏的时候,在游戏真正开始之前,⼀般会等待⼀些前置任务完成, ⽐如“加载地图数据”,“加载⼈物模型”,“加载背景⾳乐”等等。只有当所有的东⻄都 加载完成后,玩家才能真正进⼊游戏。下⾯我们就来模拟⼀下这个demo。
public class CountDownDemo { static class PreTaskThread implements Runnable{ private String task; private CountDownLatch countDownLatch; public PreTaskThread(String task, CountDownLatch countDownLatch){ this.task = task; this.countDownLatch = countDownLatch; } @Override public void run() { Random random = new Random(); try { Thread.sleep(random.nextInt(1000)); System.out.println(task + "- 任务完成"); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(()->{ try { System.out.println("等待数据加载..."); System.out.println(String.format("还有%d个前置任务",countDownLatch.getCount())); countDownLatch.await(); System.out.println("加载数据完成,开始游戏!"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //前置任务 new Thread(new PreTaskThread("加载地图数据",countDownLatch)).start(); new Thread(new PreTaskThread("加载人物模型",countDownLatch)).start(); new Thread(new PreTaskThread("加载背景音乐",countDownLatch)).start(); } }
结果:
CountDownLatch
原理,内部同样依赖了一个AQS的实现类Sync,
构造器里的计数器count
实际上是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch
没有提供任何机制去重新设置这个计数值。
下面源码分析一波:
//构造器 构建一个state为count数量的AQS同步队列 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } //等待 会调用acquireSharedInterruptibly public void await() throws InterruptedException { //允许抛出中断异常的请求共享锁 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //若当前锁不空闲,也就是计数count不为0 if (tryAcquireShared(arg) < 0) //挂起该线程 doAcquireSharedInterruptibly(arg); } //当state为0(当前锁空闲),返回1;否则返回-1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //这里的实现跟Reentrenlock的acquireQueue差不多 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //添加进阻塞队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //线程被唤醒后,会在这里醒过来,去抢锁。 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //计数递减 释放锁 public void countDown() { sync.releaseShared(1); } // public final boolean releaseShared(int arg) { //当递减到count为0时,返回true if (tryReleaseShared(arg)) { //唤醒所有等待线程 doReleaseShared(); return true; } return false; } //递减锁的重入次数 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } //唤醒等待线程操作 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //节点是否在等待唤醒状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //将头结点修改为无线程在等待 continue; // loop to recheck cases unparkSuccessor(h); //成功则唤醒所有线程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }
CyclicBarrier
从名字上理解是“循环的屏障”。CountDownLatch
一旦计数值count被降为0后,就不能重新设置了。而CyclicBarrier
可以使用reset()
重置。
如果在参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以⽤ isBroken() ⽅法检测Barrier是否被破坏。
如果有线程已经处于等待状态,调⽤reset⽅***导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会 导致始终⽆法等待。
如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常, 并且这个异常会传播到其他所有的线程。
如果在执⾏屏障操作过程中发⽣异常,则该异常将传播到当前线程中,其他线 程会抛出BrokenBarrierException,屏障被损坏。
如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线 程会抛出BrokenBarrierException异常。
举个栗子:
我们同样⽤玩游戏的例⼦。如果玩⼀个游戏有多个“关卡”,那使⽤CountDownLatch显然不太合适,那需要为每个关卡都创建⼀个实例。那我们可以 使⽤CyclicBarrier来实现每个关卡的数据加载等待功能。
public class CyclicBarrierDemo { static class PreTaskThread implements Runnable { private String task; private CyclicBarrier cyclicBarrier; public PreTaskThread(String task, CyclicBarrier cyclicBarrier) { this.task = task; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { // 假设总共三个关卡 for (int i = 1; i < 4; i++) { try { Random random = new Random(); Thread.sleep(random.nextInt(1000)); System.out.println(String.format("关卡%d的任务%s完成", i, tas cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } cyclicBarrier.reset(); // 重置屏障 } } } public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("本关卡所有前置任务完成,开始游戏..."); }); new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start(); new Thread(new PreTaskThread("加载⼈物模型", cyclicBarrier)).start(); new Thread(new PreTaskThread("加载背景⾳乐", cyclicBarrier)).start(); } }
输出:
关卡1的任务加载地图数据完成
关卡1的任务加载背景⾳乐完成
关卡1的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡2的任务加载地图数据完成
关卡2的任务加载背景⾳乐完成
关卡2的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡3的任务加载⼈物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景⾳乐完成
本关卡所有前置任务完成,开始游戏...
CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使⽤的是Lock + Condition实现的等待/通知模式。