go的semaphore
TL;DR
- 用法:
semaphore.NewWeighted(n); weighted.Acquire(context.Background(), n); weighted.Release(n)
- 信号量相当于一个长度固定的通道,初始化后可被多个进程占有,每个进程申请的长度(信号量)可以不等。当剩余信号量不足时,企图占有的进程便会排队等待。进程放弃占用时应该恢复适当的信号量。当剩余信号量恢复到能够满足队头的进程时,队伍才会前移
- 常用于实现“同一时间只允许限定数量进程运行”的需求
与其他同步原语对比一下
- 与sync.WaitGroup对比:WaitGroup可用于一个goroutine等待多个goroutine,如果用semaphore来做,就是初始化一个容量N的信号量,然后N个goroutine分别占用1,一个goroutine企图占用N,等待前N个goroutine释放各自的1个信号量。但WaitGroup也可以用于M个goroutine等待N个goroutine,而且WaitGroup是用完即弃的,semaphore则可以重复使用
- 与sync.Mutex对比:semaphore的实现含有Mutex,用于排他地更新信号量的占用状态,并且通过close(channel)来使等待中的goroutine重新运行
- 与channel对比:对带buffer的channel先写入再读出,也能实现“同一时间只允许限定数量进程运行”的需求,但无法做到“每个进程申请的信号量可以不等”
- 与sync.Cond对比:Cond用于多个(或一个)goroutine等待一个goroutine的情况
资料
- 《go语言设计与实现》6.2.2.2
- semaphone实现分析 | GoCN社区
源码分析
// 代表一个被阻塞的goroutine,其所需的信号量和触发运行的channel
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}
// 创建信号量,需指定最大信号数(也是初始信号数),必须大于零
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64 // 最大信号数
cur int64
mu sync.Mutex // 因获取和释放信号量涉及size、cur、waiters多个属性的操作,需加锁
waiters list.List
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// 如果想占用的信号量能被满足,且没有等待中的goroutine,则增加占用数,并放行
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 如果想占用的信号量大于最大信号数,则阻塞直到context被cancel
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 创建一个waiter表示当前goroutine正在等待多少个信号量
// 并加入到等待队列的尾部
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
// 等待context被cancel或者信号量得到满足
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// 如果context被cancel时刚好也有被通知有信号量,则也只当作是因为信号量得到满足而放行
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// 如果当前goroutine因超时而不再需要抢夺信号量,并且现存的信号量有多余
// 则尝试唤醒排在后面的goroutine,说不定他能因此放行
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
func (s *Weighted) Release(n int64) {
s.mu.Lock()
// 将占用数降低,但不允许变成负数
// 就是说,初始size必须大于零,有Acquire过才能Release
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 如果队列中这个goroutine所需的信号量得不到满足,即使其后的goroutine满足也得先等这个通行了才能唤醒
// 这样做是为了避免信号量需求大的goroutine发生饥饿
break
}
// 占用信号量、移出等待队列、关闭阻塞channel使对应的goroutine重新运行
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}