sidekiq架构
TL;DR
Sidekiq::Laucher#run
触发Sidekiq::Scheduled::Poller#start
、Sidekiq::Manager#start
、heartbeat
Sidekiq::Manager
根据配置的concurrency数(默认10),生成一些Sidekiq::Processor
,并在start
中调用它们的start
Sidekiq::Processor#start
会创建一个线程,该线程process_one until @done
Sidekiq::Processor#process_one
会先根据队列优先级配置,从redis取出一个任务来执行Sidekiq::Scheduled::Poller#start
会创建一个线程,循环地从retry
和schedule
这两个sorted set中取出早于当前时间的任务,放入对应的队列中
graph TB
%% Define styles
classDef application fill:#f9f,stroke:#333,stroke-width:2px
classDef client fill:#bbf,stroke:#333,stroke-width:2px
classDef storage fill:#f66,stroke:#333,stroke-width:2px
classDef processing fill:#bfb,stroke:#333,stroke-width:2px
classDef webui fill:#d9f,stroke:#333,stroke-width:2px
classDef integration fill:#ff9,stroke:#333,stroke-width:2px
%% Application Layer
subgraph Application
App[Ruby/Rails Application]
Rails[Rails Integration]:::integration
AJ[ActiveJob Adapter]:::integration
end
%% Client Layer
subgraph ClientLayer
Client[Job Client]:::client
JobImpl[Core Job Implementation]:::client
Middleware[Middleware Chain]:::client
end
%% Storage Layer
subgraph StorageLayer
Redis[(Redis Storage)]:::storage
RedisConn[Redis Connection]:::storage
end
%% Processing Layer
subgraph ProcessingLayer
Launch[Launcher]:::processing
Manager[Manager]:::processing
ProcPool[Processor Pool]:::processing
Scheduler[Scheduler]:::processing
Fetcher[Job Fetcher]:::processing
RetryHandler[Retry Handler]:::processing
DeadJobs[Dead Job Handler]:::processing
Logger[Job Logger]:::processing
Metrics[Metrics System]:::processing
Capsule[Capsule System]:::processing
end
%% Web Interface Layer
subgraph WebInterface
WebUI[Web UI Interface]:::webui
end
%% Relationships
App --> Rails
App --> AJ
Rails --> Client
AJ --> Client
Client --> JobImpl
Client --> Middleware
Middleware --> RedisConn
RedisConn --> Redis
Launch --> Manager
Manager --> ProcPool
Manager --> Scheduler
Manager --> Fetcher
Fetcher --> Redis
ProcPool --> RetryHandler
RetryHandler --> Redis
RetryHandler --> DeadJobs
ProcPool --> Logger
ProcPool --> Metrics
Capsule --> ProcPool
Redis --> WebUI
%% Click events
click Client "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/client.rb"
click ProcPool "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/processor.rb"
click RedisConn "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/redis_connection.rb"
click WebUI "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/web.rb"
click Rails "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/rails.rb"
click Manager "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/manager.rb"
click Scheduler "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/scheduled.rb"
click Fetcher "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/fetch.rb"
click RetryHandler "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_retry.rb"
click DeadJobs "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_retry.rb"
click Middleware "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/middleware/chain.rb"
click Metrics "https://github.com/sidekiq/sidekiq/tree/main/lib/sidekiq/metrics"
click Capsule "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/capsule.rb"
click Logger "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_logger.rb"
click JobImpl "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job.rb"
click Launch "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/launcher.rb"
%% Legend
subgraph Legend
L1[Application Components]:::application
L2[Client Components]:::client
L3[Storage Components]:::storage
L4[Processing Components]:::processing
L5[Web UI Components]:::webui
L6[Integration Components]:::integration
end
检查perform的调用栈
参考 [[sidekiq环境]],在perform中打印调用栈
require "sidekiq"
class PlainOldRuby
include Sidekiq::Job
def perform(how_hard = "super hard", how_long = 1)
+ pp caller
sleep how_long
puts "Workin' #{how_hard}"
end
end
得
[".../sidekiq-6.5.7/lib/sidekiq/processor.rb:202:in `execute_job'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:170:in `block (2 levels) in process'",
".../sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb:172:in `invoke'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:169:in `block in process'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:136:in `block (6 levels) in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/job_retry.rb:113:in `local'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:135:in `block (5 levels) in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq.rb:44:in `block in '",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:131:in `block (4 levels) in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:263:in `stats'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:126:in `block (3 levels) in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/job_logger.rb:13:in `call'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:125:in `block (2 levels) in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/job_retry.rb:80:in `global'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:124:in `block in dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/job_logger.rb:39:in `prepare'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:123:in `dispatch'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:168:in `process'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:78:in `process_one'",
".../sidekiq-6.5.7/lib/sidekiq/processor.rb:68:in `run'",
".../sidekiq-6.5.7/lib/sidekiq/component.rb:8:in `watchdog'",
".../sidekiq-6.5.7/lib/sidekiq/component.rb:17:in `block in safe_thread'"]
队列优先级
队列可以设置优先级
命令行方式:
# 带优先级
sidekiq -q critical,2 -q default
# 不带优先级
sidekiq -q critical -q default -q low
配置文件方式:
# 带优先级
:queues:
- [critical, 2]
- default
# 不带优先级
:queues:
- critical
- default
- low
当检测到配置任何一个队列有优先级且大于1,则设置“严格排序”为false;否则还要将队列名按优先级数复制出多份
# sidekiq-6.5.7/lib/sidekiq/cli.rb
[weight.to_i, 1].max.times { opts[:queues] << queue.to_s }
opts[:strict] = false if weight.to_i > 0
从redis取任务时,如果是严格排序,则直接执行
brpop queue:a queue:b 2
;否则先将队列名集合洗牌并去重,使高优先级的队列有更大机会排到参数列表的前面,然后再提交brpop
# sidekiq-6.5.7/lib/sidekiq/fetch.rb
module Sidekiq
class BasicFetch
def initialize(config)
# ..
@strictly_ordered_queues = !!@config[:strict]
@queues = @config[:queues].map { |q| "queue:#{q}" }
if @strictly_ordered_queues
@queues.uniq!
@queues << {timeout: TIMEOUT}
end
end
def retrieve_work
qs = queues_cmd
# ...
queue, job = redis { |conn| conn.brpop(*qs) }
UnitOfWork.new(queue, job, config) if queue
end
def queues_cmd
if @strictly_ordered_queues
@queues
else
permute = @queues.shuffle
permute.uniq!
permute << {timeout: TIMEOUT}
permute
end
end
end
end
任务处理流程
Sidekiq::Processor#start
会创建一个线程去调run
(process_one until @done
)。而
process_one
会根据队列优先级配置,从redis取出一个任务,然后调用process
# sidekiq-6.5.7/lib/sidekiq/processor.rb
def process(uow)
jobstr = uow.job
queue = uow.queue_name
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
# ...
now = Time.now.to_f
config.redis.zadd("dead", now.to_s, jobstr)
# ...
end
begin
# 找回这个任务的Sidekiq::Job子类,并将其实例化
dispatch(job_hash, queue, jobstr) do |inst|
# 执行server_middleware
@config.server_middleware.invoke(inst, job_hash, queue) do
# 传递参数给Sidekiq::Job子类对象的perform方法
execute_job(inst, job_hash["args"])
end
end
# ...
rescue Exception => ex
# ...
raise ex
ensure
# ...
end
end
def execute_job(inst, cloned_args)
inst.perform(*cloned_args)
end
def dispatch(job_hash, queue, jobstr)
# 不明
@job_logger.prepare(job_hash) do
# 捕捉错误并加入重试队列
@retrier.global(jobstr, queue) do
# 在任务执行前后打印日志
@job_logger.call(job_hash, queue) do
# 记录当前进程的任务执行次数(PROCESSED)、任务失败次数(FAILURE)、当前任务(WORK_STATE)
# 会有别的线程(heartbeat)将其写入redis
stats(jobstr, queue) do
# 可用:reloader选项传入热加载器,使其在执行任务前加载最新代码,重连数据库等
@reloader.call do
# 找回这个任务的Sidekiq::Job子类,并将其实例化
klass = constantize(job_hash["class"])
inst = klass.new
inst.jid = job_hash["jid"]
@retrier.local(inst, jobstr, queue) do
# 回到process方法里的那个block
yield inst
end
end
end
end
end
end
end
重试与定时
因为重试也是有时间间隔的,与定时任务逻辑类似,所以都放在了
Sidekiq::Scheduled
这个module中Sidekiq::Scheduled::Poller#start
会创建一个线程,不断地执行Sidekiq::Scheduled::Enq#enqueue_jobs
# sidekiq-6.5.7/lib/sidekiq/scheduled.rb
module Sidekiq
module Scheduled
class Poller
include Sidekiq::Component
def initialize(options)
@enq = (options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
# ...
end
def start
@thread ||= safe_thread("scheduler") {
# ...
until @done
enqueue
wait
end
}
end
def enqueue
@enq.enqueue_jobs
# ...
end
end
end
end
而
Sidekiq::Scheduled::Enq
则是利用lua脚本,返回retry
和schedule
两个sorted set中,预定执行时间早于当前时间的任务,然后塞入对应的queue:xxx
中# sidekiq-6.5.7/lib/sidekiq/scheduled.rb
module Sidekiq
module Scheduled
SETS = %w[retry schedule]
class Enq
LUA_ZPOPBYSCORE = <<~LUA
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
LUA
def initialize
@done = false
@lua_zpopbyscore_sha = nil
end
def enqueue_jobs(sorted_sets = SETS)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
private
def zpopbyscore(conn, keys: nil, argv: nil)
if @lua_zpopbyscore_sha.nil?
raw_conn = conn.respond_to?(:redis) ? conn.redis : conn
@lua_zpopbyscore_sha = raw_conn.script(:load, LUA_ZPOPBYSCORE)
end
conn.evalsha(@lua_zpopbyscore_sha, keys, argv)
rescue RedisConnection.adapter::CommandError => e
# ...
retry
end
end
end
end
heartbeat
heartbeat线程每隔5秒将运行状态写入redis
module Sidekiq
class Launcher
include Sidekiq::Component
STATS_TTL = 5 * 365 * 24 * 60 * 60 # 5 years
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
# ...
end
BEAT_PAUSE = 5
def start_heartbeat
loop do
heartbeat
sleep BEAT_PAUSE
end
logger.info("Heartbeat stopping...")
end
def heartbeat
# ...
❤
end
def ❤
key = identity
fails = procd = 0
begin
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset
curstate = Processor::WORK_STATE.dup
nowdate = Time.now.utc.strftime("%Y-%m-%d")
redis do |conn|
# 记录总成功数、每日成功数、总失败数、每日失败数
# 每日计数保留五年
conn.multi do |transaction|
transaction.incrby("stat:processed", procd)
transaction.incrby("stat:processed:#{nowdate}", procd)
transaction.expire("stat:processed:#{nowdate}", STATS_TTL)
transaction.incrby("stat:failed", fails)
transaction.incrby("stat:failed:#{nowdate}", fails)
transaction.expire("stat:failed:#{nowdate}", STATS_TTL)
end
# 在"#{host+pid+rand}:work"这个hash中
# 记录当前进程正在运行的线程(key)及其执行的任务(value)
# 过期时间60秒
work_key = "#{key}:work"
conn.pipelined do |transaction|
transaction.unlink(work_key)
curstate.each_pair do |tid, hash|
transaction.hset(work_key, tid, Sidekiq.dump_json(hash))
end
transaction.expire(work_key, 60)
end
end
rtt = check_rtt
fails = procd = 0
kb = memory_usage(::Process.pid)
# 在processes这个set中记录host+pid+rand
# 并在host+pid+rand这个hash中,记录执行中的任务数(busy)、心跳时间(beat)、
# 响应时间(rtt_us)、是否退出(quiet)、读取/proc所知的内存占用(rss)
_, exists, _, _, msg = redis { |conn|
conn.multi { |transaction|
transaction.sadd("processes", [key])
transaction.exists?(key)
transaction.hmset(key, "info", to_json,
"busy", curstate.size,
"beat", Time.now.to_f,
"rtt_us", rtt,
"quiet", @done.to_s,
"rss", kb)
transaction.expire(key, 60)
transaction.rpop("#{key}-signals")
}
}
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
fire_event(:heartbeat) unless exists
fire_event(:beat, oneshot: false)
return unless msg
::Process.kill(msg, ::Process.pid)
rescue => e
# ignore all redis/network issues
logger.error("heartbeat: #{e}")
# don't lose the counts if there was a network issue
Processor::PROCESSED.incr(procd)
Processor::FAILURE.incr(fails)
end
end
end
end
值示例如下
进程总体状态
> hgetall z-VirtualBox:3952:c200d7ce2ae0
> 1) "rtt_us"
> 2) "266"
> 3) "quiet"
> 4) "false"
> 5) "info"
> 6) "{\"hostname\":\"z-VirtualBox\",\"started_at\":1666511019.355878,\"pid\":3952,\"tag\":\"\",\"concurrency\":5,\"queues\":[\"misc\",\"big\"],\"labels\":[],\"identity\":\"z-VirtualBox:3952:c200d7ce2ae0\"}"
> 7) "beat"
> 8) "1666513260.3950865"
> 9) "busy"
> 10) "3"
> 11) "rss"
> 12) "42720"
进程执行中的任务
> hgetall z-VirtualBox:3952:c200d7ce2ae0:work
> 1) "200"
> 2) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big0\\\",\\\"jid\\\":\\\"63a64a7a42fe7f44c9196f11\\\",\\\"created_at\\\":1666513524.5409067,\\\"enqueued_at\\\":1666513524.5409853}\",\"run_at\":1666513524}"
> 3) "218"
> 4) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big1\\\",\\\"jid\\\":\\\"9a60f9bf3d97fd3f79ee4cd6\\\",\\\"created_at\\\":1666513521.5368056,\\\"enqueued_at\\\":1666513521.5368824}\",\"run_at\":1666513521}"
> 5) "228"
> 6) "{\"queue\":\"misc\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"misc\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Misc0\\\",\\\"jid\\\":\\\"3802c6c4369a0ce00553d2cc\\\",\\\"created_at\\\":1666513525.5429494,\\\"enqueued_at\\\":1666513525.5430243}\",\"run_at\":1666513525}"
> 7) "21w"
> 8) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big0\\\",\\\"jid\\\":\\\"c4bfb708f1063e32f920942b\\\",\\\"created_at\\\":1666513522.5381534,\\\"enqueued_at\\\":1666513522.5382252}\",\"run_at\":1666513522}"