TL;DR

sync.Mutex使用32位int记录锁定状态,从高到低:29位记录等待数、1位记录“是否有饥饿goroutine”、1位记录“是否在无饥饿goroutine的情况下被解锁”、1位记录“是否锁定”,比较省空间

使用时应注意访问相同的锁实例(应使用指针传递,或者在闭包中使用)

sync.Mutex本身并不记录被哪个goroutine锁定,可以由一个goroutine锁定,在由另一个goroutine解锁

也不会记录锁定次数,所以不能重入

实现方式:尝试能否自旋,如果不能自旋获取到锁,则加入信号量的队列中,等待叫醒

PAUSE指令

http://www.luo666.com/?p=55
https://stackoverflow.com/questions/12894078/what-is-the-purpose-of-the-pause-instruction-in-x86

结构

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
}

Lock

func (m *Mutex) Lock() {
    // 如果没其它goroutine锁住,就直接锁定,然后返回
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 如果已被锁,则轮询抢锁
    m.lockSlow()
}

lockSlow

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 一开始这个goroutine是不饿的
    awoke := false // 一开始这个goroutine是没被叫醒过的
    iter := 0 // 暂停次数
    old := m.state // 取当前状态
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // 如果已被锁且没有饥饿的goroutine,而且runtime_canSpin(iter < active_spin && ncpu > 1 && runqempty),则暂停一次(runtime_doSpin和continue)
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            // 如果自己没被叫醒过,而且别人都没叫醒过锁,而且有等待锁的goroutine(3位以上用于记录等待数),而且成功叫醒(锁设置mutexWoken位)
            // 则也标记自己是“那个叫醒锁的goroutine”
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // 暂停CPU,避免另一goroutine解锁时重排当前goroutine的一系列读操作
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        // 如果没有其它饥饿的goroutine,则尝试加锁
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 如果已被锁或者另有饥饿的goroutine,则等待数加一(3位以上用于记录等待数)
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        // 如果已经饿了,而且已被锁了,则尝试将锁标记为“有饥饿goroutine”
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 如果被叫醒过,则mutexWoken位清零
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // 如果锁状态一致……
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 且原本没加锁也没有饥饿goroutine,则说明已从“未锁”改为“已锁”,锁定成功,结束抢锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            // 首次循环为false,之后为true(更早被叫醒)
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 尝试获取一个信号量(挂起,当另一goroutine调用unlock释出一个信号量时,才返回)
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果已经有饥饿的goroutine,且自己就是被叫醒的那个(等得最久,最迟才被加到队头的那个),则有权马上设定锁定状态
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 1减8等于-7,最低位为1
                delta := int32(mutexLocked - 1<<mutexwaitershift)  =""  =""  =""  =""  =""  =""  =""  ="" 如果被叫醒的goroutine并不是饥饿状态,或者饥饿的就只有自己(没有其他等待中的goroutine了),就解除锁的饥饿标识=""                 if="" !starving="" ||="" old="">>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                // 此时state的最低位为0,加上delta,会设置最低位mutexLocked为1,即锁定。然后可以结束抢锁
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 如果仅仅被叫醒,但并没有很饿,那么
            awoke = true
            iter = 0
        } else {
            // 获取最新状态并继续抢锁
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}</mutexwaitershift)>

Unlock

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 将mutexLocked位清零
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 如果有等待中的goroutine
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

unlockSlow

func (m *Mutex) unlockSlow(new int32) {
    // 如果解锁了一个未被锁定的锁,报错……
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 如果没有饥饿的goroutine
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            // 如果没有等待中的goroutine,或者抢锁的goroutine已在刚才一瞬间抢到锁,则无需叫醒任何goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 随便叫醒一个goroutine
            new = (old - 1<<mutexwaitershift) |="" mutexwoken=""             if="" atomic.compareandswapint32(&m.state,="" old,="" new)="" {=""                 runtime_semrelease(&m.sema,="" false,="" 1)=""                 return=""             }=""             old="m.state"         }=""     }="" else="" {=""         ="" starving="" mode:="" handoff="" mutex="" ownership="" to="" the="" next="" waiter,="" and="" yield=""         ="" our="" time="" slice="" so="" that="" the="" next="" waiter="" can="" start="" to="" run="" immediately.=""         ="" note:="" mutexlocked="" is="" not="" set,="" the="" waiter="" will="" set="" it="" after="" wakeup.=""         ="" but="" mutex="" is="" still="" considered="" locked="" if="" mutexstarving="" is="" set,=""         ="" so="" new="" coming="" goroutines="" won't="" acquire="" it.=""  =""  =""  =""   ="" 如果有饥饿的goroutine,叫醒队头那个=""         runtime_semrelease(&m.sema,="" true,="" 1)=""     }="" }<="" code=""></mutexwaitershift)>