TL;DR

  • 用法:semaphore.NewWeighted(n); weighted.Acquire(context.Background(), n); weighted.Release(n)
  • 信号量相当于一个长度固定的通道,初始化后可被多个进程占有,每个进程申请的长度(信号量)可以不等。当剩余信号量不足时,企图占有的进程便会排队等待。进程放弃占用时应该恢复适当的信号量。当剩余信号量恢复到能够满足队头的进程时,队伍才会前移
  • 常用于实现“同一时间只允许限定数量进程运行”的需求
与其他同步原语对比一下

  • 与sync.WaitGroup对比:WaitGroup可用于一个goroutine等待多个goroutine,如果用semaphore来做,就是初始化一个容量N的信号量,然后N个goroutine分别占用1,一个goroutine企图占用N,等待前N个goroutine释放各自的1个信号量。但WaitGroup也可以用于M个goroutine等待N个goroutine,而且WaitGroup是用完即弃的,semaphore则可以重复使用
  • 与sync.Mutex对比:semaphore的实现含有Mutex,用于排他地更新信号量的占用状态,并且通过close(channel)来使等待中的goroutine重新运行
  • 与channel对比:对带buffer的channel先写入再读出,也能实现“同一时间只允许限定数量进程运行”的需求,但无法做到“每个进程申请的信号量可以不等”
  • 与sync.Cond对比:Cond用于多个(或一个)goroutine等待一个goroutine的情况
资料

源码分析

// 代表一个被阻塞的goroutine,其所需的信号量和触发运行的channel
type waiter struct {
    n     int64
    ready chan<- struct{} // Closed when semaphore acquired.
}

// 创建信号量,需指定最大信号数(也是初始信号数),必须大于零
func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
    size    int64 // 最大信号数
    cur     int64
    mu      sync.Mutex // 因获取和释放信号量涉及size、cur、waiters多个属性的操作,需加锁
    waiters list.List
}

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()

    // 如果想占用的信号量能被满足,且没有等待中的goroutine,则增加占用数,并放行
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    // 如果想占用的信号量大于最大信号数,则阻塞直到context被cancel
    if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    // 创建一个waiter表示当前goroutine正在等待多少个信号量
    // 并加入到等待队列的尾部
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    // 等待context被cancel或者信号量得到满足
    select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            // 如果context被cancel时刚好也有被通知有信号量,则也只当作是因为信号量得到满足而放行
            err = nil
        default:
            isFront := s.waiters.Front() == elem
            s.waiters.Remove(elem)
            // 如果当前goroutine因超时而不再需要抢夺信号量,并且现存的信号量有多余
            // 则尝试唤醒排在后面的goroutine,说不定他能因此放行
            if isFront && s.size > s.cur {
                s.notifyWaiters()
            }
        }
        s.mu.Unlock()
        return err
    case <-ready:
        return nil
    }
}

func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    // 将占用数降低,但不允许变成负数
    // 就是说,初始size必须大于零,有Acquire过才能Release
    s.cur -= n
    if s.cur < 0 {
        s.mu.Unlock()
        panic("semaphore: released more than held")
    }
    s.notifyWaiters()
    s.mu.Unlock()
}

func (s *Weighted) notifyWaiters() {
    for {
        next := s.waiters.Front()
        if next == nil {
            break // No more waiters blocked.
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // 如果队列中这个goroutine所需的信号量得不到满足,即使其后的goroutine满足也得先等这个通行了才能唤醒
            // 这样做是为了避免信号量需求大的goroutine发生饥饿
            break
        }

        // 占用信号量、移出等待队列、关闭阻塞channel使对应的goroutine重新运行
        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
}