ReentrantLock 源码分析

ReentrantLock 怎么使用

ReentrantLock 通常作为类的成员变量,用于加锁和释放锁,下面举个例子

class X {
    private final ReentrantLock lock = new ReentrantLock();
    // ...

    public void m() {
        lock.lock();  // block until condition holds
        try {
            // ... method body
        } finally {
            lock.unlock();
        }
    }
}

ReentrantLock 一般作为成员变量使用,通过 lock 方法获取锁,如果没有获取到锁会阻塞,获取到锁之后,执行业务逻辑,在 finally 代码中释放锁,防止死锁。

ReentrantLock 构造方法

我们首先看下 ReentrantLock 类的构造方法

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    // 省略部分代码

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
  	
    // 省略部分代码
}

ReentrantLock 提供了两个构造方法,默认的构造方法内部实现的是一个非公平锁 NonfairSync,也可以通过参数指定要创建的是公平锁 FairSync 还是非公平锁 NonfairSync

静态内部类 Sync

我们先看下静态内部类 Sync 的结构

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // 省略部分代码
}

Sync 是一个抽象类,默认提供了两个实现,分别是我们上面提到的公平锁 FairSync 和非公平锁 NonfairSync,公平锁和非公平锁获取锁的方式不同,公平锁会先判断等待队列中是否有其他线程在排队,如果有,则加入等待队列的尾部,如果没有则通过 CAS 去尝试获取锁,如果获取不到锁则加入等待队列的尾部。而非公平锁获取锁的时候直接通过 CAS 尝试获取锁,不管等待队列中是否有其他线程在排队。

Sync 继承了 AbstractQueuedSynchronizerAbstractQueuedSynchronizer就是我们常说的 AQS 类。

image-20230608185540531.png

AQS (AbstractQueuedSynchronizer)

内部就是一个 int 状态字段 state 和一个FIFO等待队列(双向链表),state 等于0 表示没有线程持有该锁,state 大于0表示有线程持有该锁。

ReentrantLock#lock 非公平锁的实现

下面我们通过 ReentrantLock#lock 方法一步一步的跟进代码

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    // 省略部分代码
  
    /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }

    // 省略部分代码
}

我们可以看到 lock 方法内部调用了 Sync 一个子类的 lock方法,我们先看下非公平锁 NonfairSync 的实现

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    // 省略部分代码

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 省略部分代码
  
    public void lock() {
        sync.lock();
    }

    // 省略部分代码
} 

非公平锁首先通过 compareAndSetState 方法尝试将 state 的值从0改为1,如果修改成功,则将当前线程设置为持有锁的线程。

final void lock() {
  if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
  else
    acquire(1);
}

否在执行父类 AQSacquire 方法,我们继续跟进到 AbstractQueuedSynchronizer 类中的 acquire 方法

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先调用了 tryAcquire 方法,可以看到在 AbstractQueuedSynchronizer 类中的tryAcquire 方法是一个 protected 修饰的方法,并且内部直接抛出了异常,那么很明显这个方法需要子类去实现(模版方法模式)

    /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

那么我们去 NonfairSync 类中看到 tryAcquire 方法调用了父类 SyncnonfairTryAcquire 方法

/**
* Performs non-fair tryLock.  tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  int c = getState();
  if (c == 0) { // 当前没有线程持有锁
    if (compareAndSetState(0, acquires)) { // 通过 CAS 去抢锁
      // 抢到锁了,抢持有锁的线程 exclusiveOwnerThread 设置为当前线程
      setExclusiveOwnerThread(current);
      return true;
    }
  }
  //  当前已经有线程获取到了锁,判断持有锁到线程是不是自己
  else if (current == getExclusiveOwnerThread()) {
    // 持有锁的线程是自己(可重入锁)
    int nextc = c + acquires;
    if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
    // 将 state 值+1
    setState(nextc);
    return true;
  }
  // 没有获取到锁,返回 false
  return false;
}

至此,AQS 类中的 tryAcquire 实现已经分析完了,如果没有获取到锁,会继续执行 if 中的条件判断,我们继续分析 AQSaddWaiter 方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter 方法实现,在没有获取到锁的情况下,需要将当前线程加入到等待队列

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        // 将当前线程封装到 Node 实例
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 链表尾指针
        if (pred != null) {
          	// 新node.prev 指向前面node
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // 通过 CAS 将 tail 指向新加入的 node
                pred.next = node; // 将前面node.next 指向新node
                return node;
            }
        }
      	// 尾指针为 null 的情况
        enq(node);
        return node;
    }

AQS 类中 compareAndSetTail 方法

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

AQS 类中 enq 方法

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) { // for 循环,通过 CAS 将新node加入到链表尾部
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // 通过 CAS 初始化 head 结点
                    tail = head; // 将 tail 指向 head
            } else {
              	// 以下逻辑与 addWaiter 方法上面实现一致
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

至此,没有获取到锁的线程已经成功加入到等待队列的尾部了,AQSaddWaiter 方法会返回新加入到等待队列中的 node

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们继续分析 acquireQueued 方法

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 自旋
                final Node p = node.predecessor(); // 新加入node的前一个结点
              	// p == head 说明新加入的node是链表中的第一个数据结点,尝试获取锁
                if (p == head && tryAcquire(arg)) {
                  	// 获取锁成功,将链表中的第一个结点设置为头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

通过 for 循环自旋,获取当前结点的前一个结点,如果前一个结点是头结点,然后尝试获取锁,如果获取锁成功。更新队列的头结点为当前结点,之前 head 指向一个空的 Node 结点(new Node()),这里将当前结点 node 的thread 和 prev 设置为 null,将 head 指向当前结点。

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node; // 将当前结点设置为头结点
        node.thread = null;
        node.prev = null;
    }
image-20230609094045774.png

如果新加入等待队列的结点不是第一个结点,或者是第一个结点但是尝试获取锁失败了,会执行shouldParkAfterFailedAcquire 方法,这个方法用于判断当前线程在尝试获取锁失败后是否需要 park(挂起)。

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果当前线程需要挂起,则执行 parkAndCheckInterrupt 方法

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

ReentrantLock#unlock 非公平锁的实现

    /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

调用了 AQSrelease 方法

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
          	// 释放锁成功,获取头结点
            Node h = head;
            if (h != null && h.waitStatus != 0)
              	// 唤醒等待队列中的其他线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

调用 AQStryRelease 方法尝试释放锁,同样的 AQS 中的 tryRelease 方法并没有具体实现,这个方法需要子类去实现,我们进入到 Sync 类中查看具体实现

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 判断持有锁的线程是否是当前线程,如果不是抛异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
              	// 如果state 的值为0,表示锁释放,对于可重入锁,加锁和解锁要成对出现
                free = true;
                // 将持有锁的线程设置为 null
                setExclusiveOwnerThread(null);
            }
          	// 更新 state 的值为0
            setState(c);
            return free;
        }

释放锁成功,获取等待队列头结点,唤醒等待队列中的其他线程

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

本文主要介绍了 ReentrantLock 非公平锁的底层实现原理,公平锁的实现大家可自行查看源码。

全部评论

相关推荐

喜欢走神的孤勇者练习时长两年半:池是池,发是发,我曾池,我现黑
点赞 评论 收藏
分享
牛客101244697号:这个衣服和发型不去投偶像练习生?
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务