sidekiq-throttled
TL;DR
配置:
- 各任务类调用
sidekiq_throttle
注册限流策略(Strategy
)到Registry
Strategy
内含两个StrategyCollection
,每个StrategyCollection
内含一组Concurrency
或一组Threshold
- 对应zset名称的模式:
throttled:{任务类名}:{concurrency.v2或threshold}:{分区ID}
运行:
- 弹出任务时,会经过
prepend
过的ThrottledRetriever#queues_cmd
去选取可用队列 - 弹出任务后,会经过
prepend
过的ThrottledRetriever#retrieve_work
,经Registry
调用任务对应类的Strategy#throttled?
,执行两个StrategyCollection
里两组Concurrency
和Threshold
的的lua脚本,检查任一策略的执行记录是否达到限制 - 如果达到了,就
requeue_throttled(work)
推回到原队列里,并返回nil
给sidekiq - 否则执行任务,并在执行完以后,经中间件
Server
的ensure
块,执行任务对应类的Strategy#finalize!
,清理两组Concurrency
和Threshold
限制的任务记录
Concurrency
每个分区ID占用一个zset,其中以任务ID为键,预计结束时间为score
local key = KEYS[1]
local jid = ARGV[1]
local lmt = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
-- 删除分区ID过往的执行记录
redis.call("ZREMRANGEBYSCORE", key, "-inf", "(" .. now)
-- 如果分区ID现有的执行记录数超出限制,且这些执行记录里并没有当前任务ID,就表示达到限制了
if lmt <= redis.call("ZCARD", key) and not redis.call("ZSCORE", key, jid) then
return 1
end
-- 将当前任务ID加入分区,预计其将执行ttl秒
redis.call("ZADD", key, now + ttl, jid)
-- 延长分区存活时间
redis.call("EXPIRE", key, ttl)
-- 未达到限制
return 0
执行完成后,删除分区中的任务ID
def finalize!(jid, *job_args)
Sidekiq.redis { |conn| conn.zrem(key(job_args), jid.to_s) }
end