ReentrantLock 源码分析
ReentrantLock 怎么使用
ReentrantLock 通常作为类的成员变量,用于加锁和释放锁,下面举个例子
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock();
}
}
}
ReentrantLock
一般作为成员变量使用,通过 lock
方法获取锁,如果没有获取到锁会阻塞,获取到锁之后,执行业务逻辑,在 finally
代码中释放锁,防止死锁。
ReentrantLock 构造方法
我们首先看下 ReentrantLock
类的构造方法
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
// 省略部分代码
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 省略部分代码
}
ReentrantLock 提供了两个构造方法,默认的构造方法内部实现的是一个非公平锁 NonfairSync
,也可以通过参数指定要创建的是公平锁 FairSync
还是非公平锁 NonfairSync
。
静态内部类 Sync
我们先看下静态内部类 Sync
的结构
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// 省略部分代码
}
Sync
是一个抽象类,默认提供了两个实现,分别是我们上面提到的公平锁 FairSync
和非公平锁 NonfairSync
,公平锁和非公平锁获取锁的方式不同,公平锁会先判断等待队列中是否有其他线程在排队,如果有,则加入等待队列的尾部,如果没有则通过 CAS
去尝试获取锁,如果获取不到锁则加入等待队列的尾部。而非公平锁获取锁的时候直接通过 CAS
尝试获取锁,不管等待队列中是否有其他线程在排队。
Sync
继承了 AbstractQueuedSynchronizer
,AbstractQueuedSynchronizer
就是我们常说的 AQS
类。
AQS (AbstractQueuedSynchronizer)
内部就是一个 int
状态字段 state
和一个FIFO等待队列(双向链表),state
等于0 表示没有线程持有该锁,state
大于0表示有线程持有该锁。
ReentrantLock#lock 非公平锁的实现
下面我们通过 ReentrantLock#lock
方法一步一步的跟进代码
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
// 省略部分代码
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
// 省略部分代码
}
我们可以看到 lock
方法内部调用了 Sync
一个子类的 lock
方法,我们先看下非公平锁 NonfairSync
的实现
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
// 省略部分代码
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 省略部分代码
public void lock() {
sync.lock();
}
// 省略部分代码
}
非公平锁首先通过 compareAndSetState
方法尝试将 state
的值从0改为1,如果修改成功,则将当前线程设置为持有锁的线程。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
否在执行父类 AQS
的 acquire
方法,我们继续跟进到 AbstractQueuedSynchronizer
类中的 acquire
方法
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用了 tryAcquire
方法,可以看到在 AbstractQueuedSynchronizer
类中的tryAcquire
方法是一个 protected
修饰的方法,并且内部直接抛出了异常,那么很明显这个方法需要子类去实现(模版方法模式)
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
那么我们去 NonfairSync
类中看到 tryAcquire
方法调用了父类 Sync
的 nonfairTryAcquire
方法
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 当前没有线程持有锁
if (compareAndSetState(0, acquires)) { // 通过 CAS 去抢锁
// 抢到锁了,抢持有锁的线程 exclusiveOwnerThread 设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 当前已经有线程获取到了锁,判断持有锁到线程是不是自己
else if (current == getExclusiveOwnerThread()) {
// 持有锁的线程是自己(可重入锁)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 将 state 值+1
setState(nextc);
return true;
}
// 没有获取到锁,返回 false
return false;
}
至此,AQS
类中的 tryAcquire
实现已经分析完了,如果没有获取到锁,会继续执行 if 中的条件判断,我们继续分析 AQS
类 addWaiter
方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
方法实现,在没有获取到锁的情况下,需要将当前线程加入到等待队列
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 将当前线程封装到 Node 实例
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 链表尾指针
if (pred != null) {
// 新node.prev 指向前面node
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 通过 CAS 将 tail 指向新加入的 node
pred.next = node; // 将前面node.next 指向新node
return node;
}
}
// 尾指针为 null 的情况
enq(node);
return node;
}
AQS
类中 compareAndSetTail
方法
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
AQS
类中 enq
方法
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // for 循环,通过 CAS 将新node加入到链表尾部
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 通过 CAS 初始化 head 结点
tail = head; // 将 tail 指向 head
} else {
// 以下逻辑与 addWaiter 方法上面实现一致
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
至此,没有获取到锁的线程已经成功加入到等待队列的尾部了,AQS
类 addWaiter
方法会返回新加入到等待队列中的 node
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们继续分析 acquireQueued
方法
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
final Node p = node.predecessor(); // 新加入node的前一个结点
// p == head 说明新加入的node是链表中的第一个数据结点,尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,将链表中的第一个结点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通过 for 循环自旋,获取当前结点的前一个结点,如果前一个结点是头结点,然后尝试获取锁,如果获取锁成功。更新队列的头结点为当前结点,之前 head 指向一个空的 Node 结点(new Node()),这里将当前结点 node 的thread 和 prev 设置为 null,将 head 指向当前结点。
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node; // 将当前结点设置为头结点
node.thread = null;
node.prev = null;
}
如果新加入等待队列的结点不是第一个结点,或者是第一个结点但是尝试获取锁失败了,会执行shouldParkAfterFailedAcquire
方法,这个方法用于判断当前线程在尝试获取锁失败后是否需要 park(挂起)。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果当前线程需要挂起,则执行 parkAndCheckInterrupt
方法
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
ReentrantLock#unlock 非公平锁的实现
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
调用了 AQS
的 release
方法
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 释放锁成功,获取头结点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒等待队列中的其他线程
unparkSuccessor(h);
return true;
}
return false;
}
调用 AQS
的 tryRelease
方法尝试释放锁,同样的 AQS
中的 tryRelease
方法并没有具体实现,这个方法需要子类去实现,我们进入到 Sync
类中查看具体实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 判断持有锁的线程是否是当前线程,如果不是抛异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果state 的值为0,表示锁释放,对于可重入锁,加锁和解锁要成对出现
free = true;
// 将持有锁的线程设置为 null
setExclusiveOwnerThread(null);
}
// 更新 state 的值为0
setState(c);
return free;
}
释放锁成功,获取等待队列头结点,唤醒等待队列中的其他线程
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
本文主要介绍了 ReentrantLock 非公平锁的底层实现原理,公平锁的实现大家可自行查看源码。