源码:ixti/sidekiq-throttled: Concurrency and rate-limit throttling for Sidekiq

TL;DR

配置:

  • 各任务类调用sidekiq_throttle注册限流策略(Strategy)到Registry
  • Strategy内含两个StrategyCollection,每个StrategyCollection内含一组Concurrency或一组Threshold
  • 对应zset名称的模式:throttled:{任务类名}:{concurrency.v2或threshold}:{分区ID}
运行:

  • 弹出任务时,会经过prepend过的ThrottledRetriever#queues_cmd去选取可用队列
  • 弹出任务后,会经过prepend过的ThrottledRetriever#retrieve_work,经Registry调用任务对应类的Strategy#throttled?,执行两个StrategyCollection里两组ConcurrencyThreshold的的lua脚本,检查任一策略的执行记录是否达到限制
  • 如果达到了,就requeue_throttled(work)推回到原队列里,并返回nil给sidekiq
  • 否则执行任务,并在执行完以后,经中间件Serverensure块,执行任务对应类的Strategy#finalize!,清理两组ConcurrencyThreshold限制的任务记录
Concurrency

每个分区ID占用一个zset,其中以任务ID为键,预计结束时间为score

local key = KEYS[1]
local jid = ARGV[1]
local lmt = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

-- 删除分区ID过往的执行记录
redis.call("ZREMRANGEBYSCORE", key, "-inf", "(" .. now)

-- 如果分区ID现有的执行记录数超出限制,且这些执行记录里并没有当前任务ID,就表示达到限制了
if lmt <= redis.call("ZCARD", key) and not redis.call("ZSCORE", key, jid) then
  return 1
end

-- 将当前任务ID加入分区,预计其将执行ttl秒
redis.call("ZADD", key, now + ttl, jid)

-- 延长分区存活时间
redis.call("EXPIRE", key, ttl)

-- 未达到限制
return 0

执行完成后,删除分区中的任务ID

def finalize!(jid, *job_args)
  Sidekiq.redis { |conn| conn.zrem(key(job_args), jid.to_s) }
end