sidekiq任务的进出分析
如有指定时间,则丢进schedule这个sorted set;否则,lpush进"queue:#{q}",并且在queues这个set中记录队列名
def atomic_push(conn, payloads)
if payloads.first['at']
conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues', q)
conn.lpush("queue:#{q}", to_push)
end
end
队列可以设置权重的。如果设置了,则brpop的参数中,虽然队列名称的顺序被打乱,但某些队列会出现多次,所以他们更易取到
# lib/sidekiq/cli.rb
def parse_queues(opts, queues_and_weights)
queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end
def parse_queue(opts, q, weight=nil)
opts[:queues] ||= []
raise ArgumentError, "queues: #{q} cannot be defined twice" if opts[:queues].include?(q)
[weight.to_i, 1].max.times { opts[:queues] << q }
opts[:strict] = false if weight.to_i > 0
end
Processor默认会使用以下BasicFetch策略去从"queue:#{q}"中取任务
# lib/sidekiq/fetch.rb
module Sidekiq
class BasicFetch
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down.
TIMEOUT = 2
def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
if @strictly_ordered_queues
@queues = @queues.uniq
@queues << TIMEOUT
end
end
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
一个Processor就是一个线程,线程的多少根据启动参数(也可以是配置文件里的)concurrency而定。(多进程是企业版才有)
module Sidekiq
class Manager
include Util
attr_reader :workers
attr_reader :options
def initialize(options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done = false
@workers = Set.new
@count.times do
@workers << Processor.new(self)
end
@plock = Mutex.new
end
def start
@workers.each do |x|
x.start
end
end
失败重试,丢到'retry'这个sorted set中。默认最多25次。每次时间间隔都会加长。
# lib/sidekiq/job_retry.rb
def initialize(options = {})
@max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end
def attempt_retry(worker, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)
msg['queue'] = if msg['retry_queue']
msg['retry_queue']
else
queue
end
# ....
count = if msg['retry_count']
msg['retried_at'] = Time.now.to_f
msg['retry_count'] += 1
else
msg['failed_at'] = Time.now.to_f
msg['retry_count'] = 0
end
# ....
if count < max_retry_attempts
delay = delay_for(worker, count, exception)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(worker, msg, exception)
end
end
def delay_for(worker, count, exception)
if worker && worker.sidekiq_retry_in_block
custom_retry_in = retry_in(worker, count, exception).to_i
return custom_retry_in if custom_retry_in > 0
end
seconds_to_delay(count)
end
# delayed_job uses the same basic formula
def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1))
end
retry和schedule这两个sorted set里的任务由Sidekiq::Scheduled::Poller取出,丢到对应queue
module Sidekiq
module Scheduled
SETS = %w(retry schedule)
class Enq
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
end
class Poller
include Util
INITIAL_WAIT = 10
def initialize
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
end
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
def enqueue
begin
@enq.enqueue_jobs
rescue => ex
# Most likely a problem with redis networking.
# Punt and try again at the next interval
logger.error ex.message
handle_exception(ex)
end
end