go sync.Mutex
TL;DR
sync.Mutex使用32位int记录锁定状态,从高到低:29位记录等待数、1位记录“是否有饥饿goroutine”、1位记录“是否在无饥饿goroutine的情况下被解锁”、1位记录“是否锁定”,比较省空间
使用时应注意访问相同的锁实例(应使用指针传递,或者在闭包中使用)
sync.Mutex本身并不记录被哪个goroutine锁定,可以由一个goroutine锁定,在由另一个goroutine解锁
也不会记录锁定次数,所以不能重入
实现方式:尝试能否自旋,如果不能自旋获取到锁,则加入信号量的队列中,等待叫醒
PAUSE指令
结构
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
}
Lock
func (m *Mutex) Lock() {
// 如果没其它goroutine锁住,就直接锁定,然后返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 如果已被锁,则轮询抢锁
m.lockSlow()
}
lockSlow
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 一开始这个goroutine是不饿的
awoke := false // 一开始这个goroutine是没被叫醒过的
iter := 0 // 暂停次数
old := m.state // 取当前状态
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 如果已被锁且没有饥饿的goroutine,而且runtime_canSpin(iter < active_spin && ncpu > 1 && runqempty),则暂停一次(runtime_doSpin和continue)
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 如果自己没被叫醒过,而且别人都没叫醒过锁,而且有等待锁的goroutine(3位以上用于记录等待数),而且成功叫醒(锁设置mutexWoken位)
// 则也标记自己是“那个叫醒锁的goroutine”
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 暂停CPU,避免另一goroutine解锁时重排当前goroutine的一系列读操作
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果没有其它饥饿的goroutine,则尝试加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果已被锁或者另有饥饿的goroutine,则等待数加一(3位以上用于记录等待数)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果已经饿了,而且已被锁了,则尝试将锁标记为“有饥饿goroutine”
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果被叫醒过,则mutexWoken位清零
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 如果锁状态一致……
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 且原本没加锁也没有饥饿goroutine,则说明已从“未锁”改为“已锁”,锁定成功,结束抢锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// 首次循环为false,之后为true(更早被叫醒)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 尝试获取一个信号量(挂起,当另一goroutine调用unlock释出一个信号量时,才返回)
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果已经有饥饿的goroutine,且自己就是被叫醒的那个(等得最久,最迟才被加到队头的那个),则有权马上设定锁定状态
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 1减8等于-7,最低位为1
delta := int32(mutexLocked - 1<<mutexwaitershift) ="" ="" ="" ="" ="" ="" ="" ="" 如果被叫醒的goroutine并不是饥饿状态,或者饥饿的就只有自己(没有其他等待中的goroutine了),就解除锁的饥饿标识="" if="" !starving="" ||="" old="">>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 此时state的最低位为0,加上delta,会设置最低位mutexLocked为1,即锁定。然后可以结束抢锁
atomic.AddInt32(&m.state, delta)
break
}
// 如果仅仅被叫醒,但并没有很饿,那么
awoke = true
iter = 0
} else {
// 获取最新状态并继续抢锁
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}</mutexwaitershift)>
Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 将mutexLocked位清零
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果有等待中的goroutine
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
unlockSlow
func (m *Mutex) unlockSlow(new int32) {
// 如果解锁了一个未被锁定的锁,报错……
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果没有饥饿的goroutine
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有等待中的goroutine,或者抢锁的goroutine已在刚才一瞬间抢到锁,则无需叫醒任何goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 随便叫醒一个goroutine
new = (old - 1<<mutexwaitershift) |="" mutexwoken="" if="" atomic.compareandswapint32(&m.state,="" old,="" new)="" {="" runtime_semrelease(&m.sema,="" false,="" 1)="" return="" }="" old="m.state" }="" }="" else="" {="" ="" starving="" mode:="" handoff="" mutex="" ownership="" to="" the="" next="" waiter,="" and="" yield="" ="" our="" time="" slice="" so="" that="" the="" next="" waiter="" can="" start="" to="" run="" immediately.="" ="" note:="" mutexlocked="" is="" not="" set,="" the="" waiter="" will="" set="" it="" after="" wakeup.="" ="" but="" mutex="" is="" still="" considered="" locked="" if="" mutexstarving="" is="" set,="" ="" so="" new="" coming="" goroutines="" won't="" acquire="" it.="" ="" ="" ="" ="" 如果有饥饿的goroutine,叫醒队头那个="" runtime_semrelease(&m.sema,="" true,="" 1)="" }="" }<="" code=""></mutexwaitershift)>