AQS源码解析 + 面试题

AQS源码解析 + 面试题

题目都基于面试题。搞定源码才能应对各种各样的面试题。因为万变不离其宗。

什么是AQS?

AQS是AbstractQueuedSynchronizer的简称。直译就是抽象队列同步器

它定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如ReentrantLock,ReentrantReadWriteLock,Semaphone,CountDownLatch,CyclicBarrier等。

image-20201228090138167

Sync继承了AQS,而Sync是上面同步类的静态类。

AQS定义的资源共享方式有几种?分别是什么?

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁。
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁。
    • 非公平锁:一来就抢锁,无视队列顺序,抢不到才乖乖排队。
  • Share(共享):多个线程可同时执行,如Semaphore、CountDwonLatch、CyclicBarrier、ReadWriteLock的Read锁。

AQS的实现原理是什么?

image-20201228090842150

image-20201228090855381

image-20201228090904165

简单介绍下,AQS里有一个节点Node类,通过Node类可以实现两个队列。一个是通过prev和next实现CLH队列,二是nextWaiter实现Condition条件上的等待队列

AQS是基于模板方法模式的,所以会有一些方法需要子类去实现。因此,我们通过ReentrantLock这个类的源码分析就能搞懂AQS的实现原理。

主要看三大块逻辑的代码实现:

  • ReentrantLock实例构造
  • 加锁 lock() 过程
  • 解锁 unlock() 过程

ReentrantLock实例构造

image-20201228092718650

多线程抢锁概览(非公平锁)

image-20201228094523676

加锁 lock() 源码流程

image-20201228100619988

先来看公平锁和非公平锁里lock()的实现:

image-20201228101713158

image-20201228102047559

    /**
     * The current owner of exclusive mode synchronization.
     */
    //当前持有锁的线程
    private transient Thread exclusiveOwnerThread;

里面有个acquire方法,非公平锁和公平锁加锁过程都会调用到。

来看下实现:

image-20201228103333179

首先看 tryAcquire(arg)方法

  • 公平锁

image-20201228104712660

  • 非公平锁

image-20201228105143696

对应的逻辑脑图:

image-20201228150237794

接着看addWaiter()方法:

image-20201228110625430

     /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * Creates and enqueues node for current thread and given mode.
     * 为代表当前线程并指定模式为mode的节点创建/进入队列
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 指定的模式
     * @return the new node
     *
     */
    private Node addWaiter(Node mode) {
        //为当前线程创建一个节点
        Node node = new Node(Thread.currentThread(), mode);

        Node pred = tail;
        //如果目前有线程在队列中等待
        if (pred != null) {
            node.prev = pred; /**[原尾结点pred] <-- [新节点node].prev**/
            //CAS更新尾结点
            if (compareAndSetTail(pred, node)) {
                pred.next = node; /**[原尾结点pred].next --> [新节点node]**/
                return node; /**上面相当于做了双向链表的节点关联**/
            }
        }
        // 假设一种情况:线程A在执行,当前线程B就会抢不到锁,接着来到这个方法 tail = null,不会进入if
        //返回值为首节点承载null的node
        enq(node);
        return node;
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        //this: 当前AQS实例对象(工作内存),尾部的相对偏移量,期望值(主内存),要更新的值
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

enq(node)

image-20201228112157769

对应的逻辑脑图:
image-20201228150327598

再来看acquireQueued(final Node node, int arg)方法:

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     * 会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功则将当前节点设置为head节点,然后空置之前的head节点,方便被垃圾回收掉。
     *如果加锁失败或前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法,将head节点的waitStatus变成了SIGNAL = -1 (即后继结点需要被唤醒),最后执行parkAndCheckInterrupt方法,调用LockSupport.park()挂起当前线程。
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;

            //假设线程B是node:会一直循环,直到拿到锁
            for (;;) {
                final Node p = node.predecessor();
                //如果node的前驱节点p是head,表示node是第二个节点,就可以尝试去获取锁了
                if (p == head && tryAcquire(arg)) {
                    //获取锁后,将head指向该节点,为了推进队列前移
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire方法,
                // 如果线程B的前置节点waitStatus = Node.SIGNAL 表示需要挂起线程B
                //parkAndCheckInterrupt方法,挂起线程B,判断线程是否执行过interrupt,并且清除掉interrupt的值
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /**
     * 当获取锁失败后,判断是否要挂起该线程
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; /** 例如线程B的前节点是空内容,那ws = 0,等主程序第二次循环时,才会进入下面第一个判断 **/
        if (ws == Node.SIGNAL)  /** 当前节点处于 “当前节点的后面节点需要被唤醒” 的状态 **/
            /*
             * 返回 可以挂起该线程 的信号
             */
            return true;
        if (ws > 0) { /** 前节点处于取消状态 **/
            do {
                node.prev = pred = pred.prev; /** 将所有处于取消状态去除掉 **/
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); /** 当ws = 0 或 -3时,将它置为 -1 **/
        }
        return false;
    }
    //一旦调用该方法,当前线程就会被挂起,也就是停在这个方法里
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); /** 阻塞该线程,至此该线程进入等待状态,等着unpark和interrupt叫醒他 **/
        return Thread.interrupted(); /** 叫醒之后返回该线程是否在中断状态, 并会清除中断记号 **/
    }

逻辑脑图:

image-20201228150337322

线程A, B, C的加锁流程

image-20201228162651665

image-20201228162659203

image-20201228162705237

解锁 unlock() 源码流程

image-20201228162851617

image-20201228163256381

image-20201228164106699

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获取头节点的等待状态
        int ws = node.waitStatus;
        if (ws < 0)
            //将头节点的等待状态置为0,表示后面无线程等待
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //获得头节点的后续节点
        Node s = node.next;
        //若s为空 或者 等待状态为取消
        if (s == null || s.waitStatus > 0) {
            //从尾结点开始向头节点遍历,遍历到整个队列最前面的等待状态 <= 0,
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    //赋值给s,用于后续的唤醒操作
                    s = t;
        }
        if (s != null)
            //唤醒该节点,促使再次执行抢锁操作
            //这时候这个节点会在加锁的第三部分逻辑acquirequeue那里循环里尝试抢锁。
            LockSupport.unpark(s.thread);
}

image-20201228165704891

image-20201228165714135

线程A, B, C解锁过程

image-20201228165753591

image-20201228165802637

image-20201228165809654

依赖AQS的一些通信工具类

Semaphore

作用是限制线程的数量。

最主要的⽅法是acquire⽅法和release⽅法。acquire()⽅***申请⼀个permit,⽽release⽅***释放⼀个permit。当然,你也可以申请多个acquire(int permits)或者 释放多个release(int permits)。

每次acquire,permits就会减少⼀个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为⽌。

举个栗子:

我想限制同时只有3个线程在工作:

public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程数量
        Semaphore semaphore = new Semaphore(3);
        for(int i = 1; i <= 6; i++){
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println("当前线程"+Thread.currentThread().getName()+", 还剩"
                    +semaphore.availablePermits()+"个资源,"+"还有"+semaphore.getQueueLength()
                    +"个线程在等待");
                    Random random = new Random();
                    TimeUnit.SECONDS.sleep(5);

                    System.out.println("线程"+Thread.currentThread().getName()+"释放了资源");
                    semaphore.release();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            },String.valueOf(i)).start();
        }
    }
}

结果:

image-20210115135712180

可以看出当线程6正在执行时,资源还剩0个,其他线程默认进入等待队列

当然,Semaphore默认的acquire⽅法是会让线程进⼊等待队列,且会抛出中断异 常。但它还有⼀些⽅法可以忽略中断或不进⼊阻塞队列

// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
// 不进⼊等待队列,底层使⽤CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
 throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore内部有⼀个继承了AQS的同步器Sync,重写了 tryAcquireShared ⽅ 法。在这个⽅法⾥,会去尝试获取资源。

如果获取失败(想要的资源数量⼩于⽬前已有的资源数量),就会返回⼀个负数 (代表尝试获取资源失败)。然后当前线程就会进⼊AQS的等待队列。

CountDownLatch

计数递减屏障。

假设某个线程在执行任务之前,需要等待其他线程完成前置任务,必须等待所有的前置任务都完成,才能开始执行本线程的任务。

有这些方法:

// 构造⽅法:
public CountDownLatch(int count)
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

举个栗子:,玩游戏的时候,在游戏真正开始之前,⼀般会等待⼀些前置任务完成, ⽐如“加载地图数据”,“加载⼈物模型”,“加载背景⾳乐”等等。只有当所有的东⻄都 加载完成后,玩家才能真正进⼊游戏。下⾯我们就来模拟⼀下这个demo。

public class CountDownDemo {
    static class PreTaskThread implements Runnable{

        private String task;
        private CountDownLatch countDownLatch;

        public PreTaskThread(String task, CountDownLatch countDownLatch){
            this.task = task;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + "- 任务完成");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        new Thread(()->{

            try {
                System.out.println("等待数据加载...");
                System.out.println(String.format("还有%d个前置任务",countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("加载数据完成,开始游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //前置任务
        new Thread(new PreTaskThread("加载地图数据",countDownLatch)).start();
        new Thread(new PreTaskThread("加载人物模型",countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景音乐",countDownLatch)).start();
    }
}

结果:

image-20210115143000789

CountDownLatch原理,内部同样依赖了一个AQS的实现类Sync,

构造器里的计数器count实际上是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

下面源码分析一波:

//构造器 构建一个state为count数量的AQS同步队列
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
Sync(int count) {
    setState(count);
}

//等待  会调用acquireSharedInterruptibly
public void await() throws InterruptedException {
    //允许抛出中断异常的请求共享锁
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //若当前锁不空闲,也就是计数count不为0
    if (tryAcquireShared(arg) < 0)
        //挂起该线程
        doAcquireSharedInterruptibly(arg);
}

//当state为0(当前锁空闲),返回1;否则返回-1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

//这里的实现跟Reentrenlock的acquireQueue差不多
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //添加进阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {                            //线程被唤醒后,会在这里醒过来,去抢锁。
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

//计数递减 释放锁
public void countDown() {
        sync.releaseShared(1);
}

//
public final boolean releaseShared(int arg) {
    //当递减到count为0时,返回true
    if (tryReleaseShared(arg)) {
        //唤醒所有等待线程
        doReleaseShared();
        return true;
    }
    return false;
}

//递减锁的重入次数
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
//唤醒等待线程操作
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {    //节点是否在等待唤醒状态
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))    //将头结点修改为无线程在等待
                    continue;            // loop to recheck cases
                unparkSuccessor(h); //成功则唤醒所有线程
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }

CyclicBarrier

从名字上理解是“循环的屏障”。CountDownLatch一旦计数值count被降为0后,就不能重新设置了。而CyclicBarrier可以使用reset()重置。

如果在参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以⽤ isBroken() ⽅法检测Barrier是否被破坏。

  1. 如果有线程已经处于等待状态,调⽤reset⽅***导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会 导致始终⽆法等待。

  2. 如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常, 并且这个异常会传播到其他所有的线程。

  3. 如果在执⾏屏障操作过程中发⽣异常,则该异常将传播到当前线程中,其他线 程会抛出BrokenBarrierException,屏障被损坏。

  4. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线 程会抛出BrokenBarrierException异常。

举个栗子:

我们同样⽤玩游戏的例⼦。如果玩⼀个游戏有多个“关卡”,那使⽤CountDownLatch显然不太合适,那需要为每个关卡都创建⼀个实例。那我们可以 使⽤CyclicBarrier来实现每个关卡的数据加载等待功能。

public class CyclicBarrierDemo {
 static class PreTaskThread implements Runnable {
 private String task;
 private CyclicBarrier cyclicBarrier;
 public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
 this.task = task;
 this.cyclicBarrier = cyclicBarrier;
 }
 @Override
 public void run() {
 // 假设总共三个关卡
 for (int i = 1; i < 4; i++) {
 try {
 Random random = new Random();
 Thread.sleep(random.nextInt(1000));
 System.out.println(String.format("关卡%d的任务%s完成", i, tas
 cyclicBarrier.await();
 } catch (InterruptedException | BrokenBarrierException e) {
 e.printStackTrace();
 }
 cyclicBarrier.reset(); // 重置屏障
 }
 }
 }
 public static void main(String[] args) {
 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
 System.out.println("本关卡所有前置任务完成,开始游戏...");
 });
 new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
 new Thread(new PreTaskThread("加载⼈物模型", cyclicBarrier)).start();
 new Thread(new PreTaskThread("加载背景⾳乐", cyclicBarrier)).start();
 }
}

输出:

关卡1的任务加载地图数据完成

关卡1的任务加载背景⾳乐完成

关卡1的任务加载⼈物模型完成

本关卡所有前置任务完成,开始游戏...

关卡2的任务加载地图数据完成

关卡2的任务加载背景⾳乐完成

关卡2的任务加载⼈物模型完成

本关卡所有前置任务完成,开始游戏...

关卡3的任务加载⼈物模型完成

关卡3的任务加载地图数据完成

关卡3的任务加载背景⾳乐完成

本关卡所有前置任务完成,开始游戏...

CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使⽤的是Lock + Condition实现的等待/通知模式。

全部评论

相关推荐

头像
11-07 01:12
重庆大学 Java
精致的小松鼠人狠话不多:签哪了哥
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务