深入java虚拟机:原子操作ParkEvent和Parker

基础设施

HotSpot VM并发的基础设施主要是原子操作、ParkEvent和Parker,后面两个功能的重合度很高,未来可能合并为一个ParkEvent。在笔者写这本书的时候(jdk-12+31),它们两个还是独立的个体,所以文中会分别讨论。

 原子操作

原子操作即普通意义上的不可打断的操作。HotSpot VM的原子模块位于runtime/atomic,它实现了原子性的递增值、交换值、比较并交换等操作,其底层实现依赖于CPU指令。举个例子,x86提供lock指令前缀,以保证一个CPU在执行被修饰的指令期间互斥地拥有对应的Cache Line的所有权。这个保证是并发的基础,并发离不开线程,线程离不开锁,如果多个线程在同一时刻抢锁(互斥量/同步量),锁内部就必须有一条只能互斥执行的代码,这便是原子指令。

ParkEvent

使用ParkEvent可以使线程睡眠与唤醒。一个ParkEvent与一个线程的生命周期绑定,当线程结束时,ParkEvent会移到一个EventFreeList链表,而新创建的线程会在EventFreeList中查找ParkEvent,如果没有就分配新的ParkEvent。ParkEvent本身只有分配和释放接口,但是它继承了平台相关的PlaformEvent,因此它就有了PlatformEvent提供的park、unpark接口,如代码清单6-9所示:

代码清单6-9 POSIX PlatformEvent的实现

void os::PlatformEvent::park() {
// CAS递减_event的值
int v;
for (;;) {
v = _event;
if (Atomic::cmpxchg(v - 1, &_event, v) == v) break;
}
if (v == 0) {
int status = pthread_mutex_lock(_mutex);
// 阻塞线程加一
++_nParked;
// 如果递减之后event为-1则阻塞,否则立刻返回
while (_event < 0) {
status = pthread_cond_wait(_cond, _mutex);
}
// 阻塞线程减一
--_nParked;
_event = 0;
status = pthread_mutex_unlock(_mutex);
OrderAccess::fence();
}
}void os::PlatformEvent::unpark() {
// 如果_event大于等于0,则设置为1并返回
if (Atomic::xchg(1, &_event) >= 0) return;
// 仅当_event为-1时,即另一个线程阻塞住时才执行后面的唤醒另一个线程的操作
int status = pthread_mutex_lock(_mutex);
int anyWaiters = _nParked;
status = pthread_mutex_unlock(_mutex);
if (anyWaiters != 0) {
status = pthread_cond_signal(_cond);
}
}

ParkEvent依赖的假设是它只被当前绑定的线程park,但是允许多个线程unpark。了解Windows的读者都知道,Windows内核有可以作为同步工具的Event内核对象。当一个操作完成时,可以将Event对象设置为触发状态,此时等待Event事件的线程将得到通知。ParkEvent在Windows上是通过Event内核对象实现的,由于内核的原生支持,其实现也比POSIX简单不少,如代码清单6-10所示:

代码清单6-10 Windows PlatformEvent的实现

void os::PlatformEvent::park() {
int v;
for (;;) {v = _Event;
if (Atomic::cmpxchg(v-1, &_Event, v) == v) break;
}
if (v != 0) return;
while (_Event < 0) {
DWORD rv = ::WaitForSingleObject(_ParkHandle, INFINITE);
}
_Event = 0;
OrderAccess::fence();
}
void os::PlatformEvent::unpark() {
if (Atomic::xchg(1, &_Event) >= 0) return;
::SetEvent(_ParkHandle);
}

ParkEvent广泛应用于HotSpot VM内部,实现synchronized同步代码块、对象锁语意以及JVM内部用的Mutex等功能,更多关于它的内容将在本章后面提到。

Parker

除了ParkEvent,HotSpot VM还有个与之功能重合的Parker,如代码清单6-11所示:

代码清单6-11 Parker

void Parker::park(bool isAbsolute, jlong time) {
// 如果count为1,当前park调用直接返回
if (Atomic::xchg(0, &_counter) > 0) return;
...
if (time == 0) {
_cur_index = REL_INDEX;
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
}
else {
// 如果时间不为零,根据isAbsolute来选择毫秒还是微秒
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(...);
}
_cur_index = -1;
_counter = 0;
status = pthread_mutex_unlock(_mutex);
OrderAccess::fence();
...
}
void Parker::unpark() {
int status = pthread_mutex_lock(_mutex);
const int s = _counter;
_counter = 1;
int index = _cur_index;status = pthread_mutex_unlock(_mutex);
// 线程肯定是park的,唤醒它;对于没有park的线程,调用unpark是安全的,因为此时unpark
// 只会把counter设置为可获得然后返回。
if (s < 1 && index != -1) {
status = pthread_cond_signal(&_cond[index]);
}
}

Parker的核心是_counter值的变化,_coutner也叫permit。如果permit可获得(为1),那么调用park的线程立刻返回,否则可能阻塞。调用unpark使permit可获得;调用park使permit不可获得。与之不同的是,信号量(Semaphore)的permit可以累加,而Parker只有可获得、不可获得两种状态,它可以被看作受限的信号量。

Parker主要用于实现JSR166的并发组件。之前提到过JDK有个Unsafe类,该类允许Java层做一些底层的工作,如插入内存屏障,Parker也是通过Unsafe类暴露API的,如代码清单6-12所示:

代码清单6-12 Unsafe.park/Unsafe.unpark

public final class Unsafe {
/**
* 对调用了park的指定线程解除阻塞。如果指定线程没有调用park,即没有阻塞,
* 那么当前调用unpark会导致下一次调用park线程的线程不阻塞。
* 注意,该操作是不安全的,因为它必须确保指定线程没有销毁,通常在Java层调
* 用时不需要关心这个问题,但是在native code调用的时候需要。* @param thread 需要解除阻塞(park)状态的线程
*/
@HotSpotIntrinsicCandidate
public native void unpark(Object thread);
/**
* 阻塞当前线程,直到unpark被调用才解除。如果调用park前已经调用过unpark
* 那么park立即返回。如果当前线程已经中断,也立即返回。如果absolute
* 为false且time不为0,那么当过了指定的微秒时间后park立即返回。如果
* absolute为true且time不为0,那么当过了指定的毫秒时间后park立即返
* 回。如果操作系统伪唤醒,park也立即返回。该操作之所以放到Unsafe类里只是
* 因为unpark也在这里,把它放在其地方比较奇怪。
*/
@HotSpotIntrinsicCandidate
public native void park(boolean isAbsolute, long time);
...
}

注释描述的所有行为都和Parker源码一致。不过Unsafe.park/Unsafe.unpark不会被java.util.concurrent(JUC)的并发组件直接调用,而是会被JUC的LockSupport简单包装后再调用,如代码清单6-13所示:

代码清单6-13 LockSupport

package java.util.concurrent.locks;public class LockSupport {
private static final Unsafe U = Unsafe.getUnsafe();
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
...
}

JUC的“基石”——
AbstractQueuedSynchronizer就使用了LockSupport,并以此构造出整个JUC体系。除此之外,LockSupport也被ConcurrentHashMap、ForkJoinPool、StampedLock等JUC组件直接使用,具体请参见JDK中的JUC类库源码。

Monitor

HotSpot VM内部的并发主要依赖包装了ParkEvent并高度优化的Monitor。Monitor是一个复杂的实现,如代码清单6-14所示:

代码清单6-14 Monitor实现细节class Monitor :

public CHeapObj<mtInternal> {
protected:
SplitWord _LockWord ; // 竞争队列(cxq)
Thread * volatile _owner; // 锁的owner
ParkEvent * volatile _EntryList ; // 等待线程列表
ParkEvent * volatile _OnDeck ; // 假定继承锁的线程
volatile intptr_t _WaitLock [1] ; // 保护_WaitSet
ParkEvent * volatile _WaitSet ; // 等待集合
volatile bool _snuck; // 用于sneaky locking
char _name[MONITOR_NAME_LEN]; // Monitor名称
...
};

为了追求性能和可扩展性的平衡,Monitor实现了fast/slow惯例。在fast路径时,线程会原子性地修改Lock_Word中的Lock_Byte,如果没有线程竞争Monitor,则线程成功加锁,否则进入slow路径。

slow路径的设计思想围绕扩展性。Monitor为锁竞争的线程准备了cxq和EntryList两个队列,并包含了OnDeck和owner两种锁状态。最近达到的线程会进入cxq,owner表示持有当前锁的线程,OnDeck是由owner选择的、作为继承者将要获取到锁的线程。同时,owner也负责将EntryList的线程移动到OnDeck,如果EntryList为空,那么owner会将cxq的所有线程移动到EntryList。

就效率来说,slow路径也是同样优秀的。它会限制并发获取锁的线程数目。比如把GC线程放入WaitSet,当条件成立后GC线程不会立即唤醒竞争锁,因为Monitor会将WaitSet转移到cxq中。另外,位于cxq和EntryList中的阻塞态线程也不允许竞争锁。因此,任何时候只有三类线程可以竞争锁:OnDeck、刚释放锁的owner和刚到达但是未被放入cxq的线程。

LockWord是一个机器字,它可以表示cxq,存放线程指针作为一个竞争队列,也可以作为一个整型变量扮演锁的角色。线程加锁、解锁实际上是将cxq最低有效字节(LSB,Least Significant Byte)分别置0、置1。Monitor的加锁、解锁过程比较复杂,其加锁逻辑如代码清单6-15所示:

代码清单6-15 Monitor::ILock

void Monitor::ILock(Thread * Self) {
// 尝试获取cxq锁,如果没加锁(没竞争)则快速加锁并结束
if (TryFast()) {
Exeunt:
return;
}
// 否则有竞争
ParkEvent * const ESelf = Self->_MutexEvent;
if (TrySpin(Self)) goto Exeunt;
ESelf->reset();
OrderAccess::fence();
// 尝试获取cxq锁,如果成功则结束,否则将Self线程放入cxq
if (AcquireOrPush(ESelf)) goto Exeunt;// 任何时刻只有一个线程位于OnDeck,如果Self线程没有位于OnDeck,那么阻塞等待
while (OrderAccess::load_acquire(&_OnDeck) != ESelf) {
ParkCommon(ESelf, 0);
}
// 此时Self位于OnDeck,直到获取到锁,否则它一直待在OnDeck
for (;;) {
if (TrySpin(Self)) break;
ParkCommon(ESelf,0);
}
_OnDeck = NULL;
goto Exeunt;
}

假设有多条线程同时调用Monitor::ILock,只有一条线程成功执行CAS,将cxq锁的LSB置为1,当其他线程发现竞争后,CAS失败表示发现锁存在竞争,进入竞争逻辑。

竞争逻辑是一个复杂且漫长的过程。线程先入cxq,然后阻塞,当它们被唤醒后会先检查自己是否在OnDeck,如果没有则再次阻塞。如果在OnDeck中,除非抢到锁才能退出去,否则一直待在OnDeck中。解锁更为复杂,不仅需要解除锁定,还需要寻找下一个待解锁的线程,如代码清单6-16所示:

代码清单6-16 Monitor::IUnlock

void Monitor::IUnlock(bool RelaxAssert) {// 将LSB置为0,释放cxq锁
OrderAccess::release_store(&_LockWord.Bytes[_LSBINDEX], 0);
OrderAccess::storeload();
// 如果OnDeck不为空,唤醒OnDeck线程
ParkEvent * const w = _OnDeck;
if (w != NULL) {
if ((UNS(w) & _LBIT) == 0) w->unpark();
return;
}
// OnDeck为空,如果cxq和EntryList都为空,没有Unlock的线程,直接退出
intptr_t cxq = _LockWord.FullWord;
if (((cxq & ~_LBIT)|UNS(_EntryList)) == 0) {
return;
}
// 如果在第一行代码(释放cxq)到此处期间有线程获取到锁,那么当前Unlock返回
// 寻找Succession的任务并交给那个线程
if (cxq & _LBIT) {
return;
}
// 寻找Succession(寻找下一个放到OnDeck做Unlock的线程)
Succession:
if (!Atomic::replace_if_null((ParkEvent*)_LBIT, &_OnDeck)) {
return;
}
// 如果EntryList不为空,则将第一个元素从EntryList放入OnDeck
ParkEvent * List = _EntryList;if (List != NULL) {
WakeOne:
ParkEvent * const w = List;
_EntryList = w->ListNext;
OrderAccess::release_store(&_OnDeck, w);
OrderAccess::storeload();
cxq = _LockWord.FullWord;
if (cxq & _LBIT) return;
w->unpark();
return;
}
// 否则EntryList为空
cxq = _LockWord.FullWord;
if ((cxq & ~_LBIT) != 0) {
// 从cxq获取元素,放到EntryList
for (;;) {
// 如果从第一行解释代码到此处有其他线程加锁了,
// 则寻找Succession的任务并交给那个线程
if (cxq & _LBIT) goto Punt;
intptr_t vfy = Atomic::cmpxchg(...);
if (vfy == cxq) break;
cxq = vfy;
}
// 否则从cxq拿出线程放入EntryList,然后把那个线程从EntryList拿出
// 放到OnDeck,并唤醒
_EntryList = List = (ParkEvent *)(cxq & ~_LBIT);
goto WakeOne;}
Punt:
_OnDeck = NULL;
OrderAccess::storeload();
cxq = _LockWord.FullWord;
if ((cxq & ~_LBIT) != 0 && (cxq & _LBIT) == 0) {
goto Succession;
}
return;
}

之前假设了多个线程执行Monitor::ILock,其中一个成功加锁并继续执行代码,其他线程都阻塞并等待自己被放入OnDeck。结合之前的场景,IUnlock先将LSB置为0释放cxq锁。如果OnDeck存在线程则解除阻塞。如果OnDeck为空但是EntryList存在线程,则将第一个元素从EntryList移到OnDeck,再解除锁定。如果EntryList为空但是cxq不为空,这和我们假设的情景一致,此时将cxq的线程移动到EntryList,然后再将EntryList的线程放入OnDeck,最后解除锁定。

回想条件等待的一般情景:首先线程抢锁,抢到锁后线程进入等待队列然后释放锁,接着立刻阻塞等待。线程可能因为被其他线程通知,或者等待超时,或者伪唤醒而解除等待状态,此时如果要接着执行wait()后面的代码,需要线程能再次抢到外部的锁。

Monitor::IWait()的实现与上述场景大致相同,只是多了些细节,如代码清单6-17所示:

代码清单6-17 Monitor::IWait

int Monitor::IWait(Thread * Self, jlong timo) {
// 执行wait的线程必须已经获得了锁
assert(ILocked(), "invariant");
ParkEvent * const ESelf = Self->_MutexEvent;
ESelf->Notified = 0;
ESelf->reset();
OrderAccess::fence();
// 将Self线程加入等待集合
Thread::muxAcquire(_WaitLock, "wait:WaitLock:Add");
ESelf->ListNext = _WaitSet;
_WaitSet = ESelf;
Thread::muxRelease(_WaitLock);
// 释放外部的锁(即执行wait前加的锁)
IUnlock(true);
// 线程阻塞等待,直到收到另一个线程的通知,或者超时唤醒,或者伪唤醒
for (;;) {
if (ESelf->Notified) break;
int err = ParkCommon(ESelf, timo);
if (err == OS_TIMEOUT) break;
}
OrderAccess::fence();
// 现在线程从wait状态唤醒了,需要将它移出等待集合WaitSetint WasOnWaitSet = 0;
if (ESelf->Notified == 0) {
Thread::muxAcquire(_WaitLock, "wait:WaitLock:remove");
if (ESelf->Notified == 0) {
ParkEvent * p = _WaitSet;
ParkEvent * q = NULL;
while (p != NULL && p != ESelf) {
q = p;
p = p->ListNext;
}
if (p == _WaitSet) {
_WaitSet = p->ListNext;
} else {
q->ListNext = p->ListNext;
}
WasOnWaitSet = 1;
}
Thread::muxRelease(_WaitLock);
}
// 尝试重新获取锁
if (WasOnWaitSet) {
// 如果Self线程是因为wait超时被唤醒,那么它还在等待集合里面,可直接获得锁
ILock(Self);
} else {
// 否则Self线程是因为其他线程通知而被唤醒,尝试抢锁,抢不到就阻塞等待
for (;;) {if (OrderAccess::load_acquire(&_OnDeck) == ESelf
&& TrySpin(Self)) break;
ParkCommon(ESelf, 0);
}
_OnDeck = NULL;
}
// 醒来后可继续执行的线程必须抢到了外部锁
assert(ILocked(), "invariant");
return WasOnWaitSet != 0;
}

最全学习笔记:大厂真题+微服务+MySQL+分布式+SSM框架+Java+Redis+数据结构与算法+网络+Linux+Spring全家桶+JVM+高并发+各大学习思维脑图+面试集合

全部评论

相关推荐

双非硬上算法:洗洗睡吧,说多了都是遗憾,说白了都是扯淡
点赞 评论 收藏
分享
头像
11-07 01:12
重庆大学 Java
精致的小松鼠人狠话不多:签哪了哥
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务