go sync.WaitGroup
分析如下
type WaitGroup struct {
// sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间
// 检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,
// 如果包含该结构体或者实现了对应的方法就会报出以下错误
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// and the other 4 bytes as storage for the sema
state1 [3]uint32
}
// 因为需要用到64位的原子操作,counter和waiter count必须连续
// 所以(*uint64)(unsafe.Pointer(...)
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// 增加counter并获取新counter值(在高32位,是“待执行的goroutine的计数”)
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
// 获取waiter count(截取低32位)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
// 如果counter扣多了,扣到负数,报错
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 如果是增加counter,或者扣,返回counter但没有扣完,或者没有等待goroutines都执行完的goroutine
// 则不用干啥
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
// 唤醒所有等待中的goroutine
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
// Done 其实就是调用Add(-1)
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 截取“待执行的goroutines的计数”和“等待中的goroutine数”(高低32位)
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 如果没有“待执行的goroutines”,则无需等待,直接返回
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 尝试增加“等待中的goroutine数”
// 因为在LoadUint64和此之间,可能有别人改变了“待执行的goroutine的计数”和“等待中的goroutine数”
// 例如“待执行的goroutines”都刚好执行完,则其实无需等待,所以应该再轮询一次检查状态
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 睡眠
// 如果在CAS和semacq之间,“待执行的goroutines”都执行完,岂不是唤醒不了此goroutine?
// 不会的,因为“等待中的goroutine数”已经加一,所以semacq会直接获取到信号,直接执行到下面的return
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}