「Golang」 sync.Mutex源码讲解

sync.Mutex概述

后期更新:下面的代码中会多次出现
if race.Enabled { //todo }
这种代码,经过理解这种判断是进行判断当前程序是否开启了race竞态检测模式 ,即在运行go程序时是否采用
go run race xxx.go
这种进行进行竞态检测运行模式

在进行源码解析之前,先看一下sync.Mutex是做什么的,首先看一下sync/mutex.go中的标注

A Mutex is a mutual exclusion lock.The zero value for a Mutex is an unlocked mutex. A Mutex must not be copied after first use.
译注:Mutex是互斥锁,Mutex的零值是解锁的Mutex。Mutex在第一次使用后不得复制。

从sync.Mutex的注释中可以看出,Mutex是一个互斥锁,根据百度百科给到的互斥锁的意义:

在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。

也就是说,当某个struct中存在一个Mutex字段时,在某些方法进行调用时,如果Mutex已经被其他go程占用,那么本次调用会进行阻塞直至该Mutex被解锁。

sync.Mutex解析

接下来看一下sync.Mutex结构体的字段内容:

type Mutex struct {
   
	state int32 //当前锁的状态,该int32字段通过位移操作使之可以包含不同意义
	sema  uint32 //是一个信号变量用于负责go程的唤醒和阻塞休眠
}


const (
	mutexLocked = 1 << iota // 锁是否被持有 即是否已经锁住
	mutexWoken // 是否有被唤醒的go程
	mutexStarving //是否处于饥饿状态,此标记可以确保某些go程不会长久获取不到锁
	mutexWaiterShift = iota // 目前等待锁的go程数量
	
	starvationThresholdNs = 1e6 //进入饥饿状态的阈值时间 1ms
)

Lock()

关于饥饿状态这个问题等之后在进行讲解,首先先说一下对Mutex加锁的过程

func (m *Mutex) Lock() {
   
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
   
		if race.Enabled {
   
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

Lock方法被分成了两个情况进行加锁,首先第一种情况 即Fast Path快速加锁方式,此方式会首先进行CAS1 判断,首先进行判断当前的state字段是否为0,如果为0则代表当前互斥锁未被占用,因此可以直接对其进行加锁操作然后直接返回,所以此情况被称为Fast Path,在判断语句中的

if race.Enabled {
   
			race.Acquire(unsafe.Pointer(m))
		}

是go提供的竟态检测器,用于检测当前是否有其他操作同时操纵此Mutex对象。

注:因为race包不甚了解因此解释可能有出入,如果有更好的解释,请多指教。

之后来看一下第二种情况,即Slow Path,也就是说当前锁已经被人占用,需要进行一些其他操作。因为代码较长,可能会难读一些。我会尽量在每个语句中做注释讲解。

func (m *Mutex) lockSlow() {
   
   // 记录等待开始时间,用于与starvationThresholdNs搭配判断是否开启饥饿状态
	var waitStartTime int64
	//是否是饥饿状态
	starving := false
	//是否是唤醒状态
	awoke := false
	//自旋次数累计
	iter := 0
	//下面操作的都是这个old 其为当前Mutex对象状态字段的copy
	old := m.state
	//持续循环
	for {
   
		// 判断锁是不是非饥饿状态,并且锁还没被释放,如果是那就开始尝试自旋,多次尝试获取锁
		//不能再饥饿模式下进行自旋,因为可能会造成某些等待的很早的go程长时间获取不到锁。
		//所以当处于饥饿状态时,不能自旋,并且如果锁已经被释放了就需要竞争锁
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   
		//如果非唤醒状态,并且mutexWoken标记位为0并且其他go程也在等待锁的时候
		//尝试设置mutexWoken标记位,用于告知在Unlock的时候不需要去唤醒其他等待go程
		//设置唤醒标记的意义在于可以告知Unlock有go程正在自旋等待锁,让其无需唤醒其他正在等待的go程
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
   
				awoke = true
			}
			//开始自旋 自旋代码是runtime在编译期间动态生成
			runtime_doSpin()
			//自旋次数+1
			iter++
			//更新锁的状态
			old = m.state
			continue
		}
		//检查的状态
		new := old
		//如果不是饥饿状态,则获取锁
		if old&mutexStarving == 0 {
   
			new |= mutexLocked
		}
		//等待锁go程的数量+1
		if old&(mutexLocked|mutexStarving) != 0 {
   
			new += 1 << mutexWaiterShift
		}
		// 如果是饥饿状态 并且有人占用了锁 就切换到饥饿模式
		if starving && old&mutexLocked != 0 {
   
			new |= mutexStarving
		}
		//看看当前是否是唤醒go程
		if awoke {
   
				//如果不是唤醒go程 但是awoke变量被设置为true 则代表出错了,,
				if new&mutexWoken == 0 {
   
				throw("sync: inconsistent mutex state")
			}
			//go程已从睡眠中唤醒,因此,无论哪种情况,都需要重置标志。
			new &^= mutexWoken
		}
		//cas 判断当前对象旧状态和old标记的状态是否相同,相同就换成new的,不相同就用当前对象的
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
   
		//old已经被解锁并释放并且不是饥饿状态 就代表正常的获取到了锁 则返回
			if old&(mutexLocked|mutexStarving) == 0 {
   
				break 
			}
			//等待时间不等于0就代表之前就开始等待了
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
   
				waitStartTime = runtime_nanotime()
			}
			//如果之前就开始等待了那就放到队列前面
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			//如果等待时间超过1ms 即starvationThresholdNs常量中的设定值,则开启饥饿模式
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			//如果当前状态非饥饿模式
			if old&mutexStarving != 0 {
   
				//如果此goroutine被唤醒,并且互斥锁处于饥饿模式,则所有权已移交给我们,但互斥锁处于某种不一致的状态:MutexLocked未设置,我们仍被视为等待。即状态设置有误
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
   
					throw("sync: inconsistent mutex state")
				}
				//减去一个等待者数量并且加锁
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				//如果不是饥饿模式或等待数量为1的话就代表是最后一个等待者并且非饥饿状态
				if !starving || old>>mutexWaiterShift == 1 {
   
					//那就解除饥饿标记
					delta -= mutexStarving
				}
				//更新状态
				atomic.AddInt32(&m.state, delta)
				break
			}
			//设为唤醒状态
			awoke = true
			//重新开始循环
			iter = 0
		} else {
   
			old = m.state
		}
	}
	//竟态检测
	if race.Enabled {
   
		race.Acquire(unsafe.Pointer(m))
	}
}

现在进行一下Lock过程的总结:总的来说过程就是,如果当前锁没被其他go程获取,那么就直接获取锁,这也是最直接的方式,如果当前锁被其他go程占用,并且还没有进入饥饿状态时,进行自旋等待,并且通知UnLock有正在自旋go程正在等待锁,在释放锁时就不要唤醒其他go程,然后判断是否需要进入饥饿状态,进入饥饿状态的条件是,当前go程等待锁的时间已经超出starvationThresholdNs常量所设定的时间即1ms,此时进入饥饿状态,饥饿状态会把当前go程放入等待锁的队列的最前端,使得其能在UnLock后立刻获得锁,防止该go程被饿死。退出饥饿模式需要符合一下两个条件中的任意一条:

  1. 此 go程已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
  2. 此 go程的等待时间小于 1 毫秒。

那么当有很多的go程都在争相获取锁的时候,会按照什么顺序获取锁呢?

等待的goroutine们是以FIFO排队的
1)当Mutex处于正常模式时,若此时没有新goroutine与队头goroutine竞争,则队头goroutine获得。若有新goroutine竞争大概率新goroutine获得。
2)当队头goroutine竞争锁失败1ms后,它会将Mutex调整为饥饿模式。进入饥饿模式后,锁的所有权会直接从解锁goroutine移交给队头goroutine,此时新来的goroutine直接放入队尾。
3)当一个goroutine获取锁后,如果发现自己满足下列条件中的任何一个#1它是队列中最后一个#2它等待锁的时间少于1ms,则将锁切换回正常模式

那么问题来了,为什么正常模式下会让新来的go程获取到锁呢?因为新来的go程当前正在占用cpu的时间片,那么如果我们能够把锁交给正在占用 cpu 时间片的 go程 的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。所以在正常模式下会把锁交给新来的go程。

至此Lock()的过程就讲完了。接下来说Unlock。

Unlock()

首先看Unlock的代码

func (m *Mutex) Unlock() {
   
	if race.Enabled {
   
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
   
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

Unlock与Lock一样分为两个情况,Fast Path直接将state标记位-1进行解锁,如果返回值不为0那么就代表还有等待锁的go程这样就需要对其进行一些处理,就进入了Slow Path情况:

func (m *Mutex) unlockSlow(new int32) {
   
	//首选判断是不是被解锁过了,因为不可二次解锁
	if (new+mutexLocked)&mutexLocked == 0 {
   
		throw("sync: unlock of unlocked mutex")
	}
	//如果不是饥饿状态则做处理 否则就唤醒处于等待队列第一个的go程
	if new&mutexStarving == 0 {
   
		old := new
		for {
   
		//判断是否是最后一个go程或者唤醒标记处于已标记状态则不唤醒任何等待go程
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
   
				return
			}
			//否则就等待队列-1 并且设置唤醒标记
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			//然后进行CAS处理
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
   
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
   
			runtime_Semrelease(&m.sema, true, 1)
	}
}

Unlock的代码要比Lock少很多,也好理解的多,总的来说就是首选判断是否是重复解锁,如果是就报错,否则判断是否是饥饿状态如果是就直接唤醒当前等待锁队列的第一个go程,否则就判断是否是还有没有等待的go程或者是否已经被Lock标记不需要唤醒其他go程,如果是的话就直接返回,否则就将等待数量-1并且设置唤醒标记然后唤醒一个等待锁队列中的go程,让其得到锁。

总结

至此sync.Mutex的源码解析就解析完了,可能有些地方有些理解上的错误,请各位谅解并且帮忙指出修改意见,如果这篇文章能帮到你,这是我的荣幸。

参考链接:

[^1] CAS比较与交换:https://baike.baidu.com/item/CAS/1329876#viewPageContent

全部评论

相关推荐

程序员鼠鼠_春招版:都很烂大街,rpc也基本没人问,考研吧,不然就包装一段实习再去
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务