如有指定时间,则丢进schedule这个sorted set;否则,lpush进"queue:#{q}",并且在queues这个set中记录队列名

def atomic_push(conn, payloads)
  if payloads.first['at']
    conn.zadd('schedule', payloads.map do |hash|
      at = hash.delete('at').to_s
      [at, Sidekiq.dump_json(hash)]
    end)
  else
    q = payloads.first['queue']
    now = Time.now.to_f
    to_push = payloads.map do |entry|
      entry['enqueued_at'] = now
      Sidekiq.dump_json(entry)
    end
    conn.sadd('queues', q)
    conn.lpush("queue:#{q}", to_push)
  end
end

队列可以设置权重的。如果设置了,则brpop的参数中,虽然队列名称的顺序被打乱,但某些队列会出现多次,所以他们更易取到

# lib/sidekiq/cli.rb
def parse_queues(opts, queues_and_weights)
  queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end

def parse_queue(opts, q, weight=nil)
  opts[:queues] ||= []
  raise ArgumentError, "queues: #{q} cannot be defined twice" if opts[:queues].include?(q)
  [weight.to_i, 1].max.times { opts[:queues] << q }
  opts[:strict] = false if weight.to_i > 0
end

Processor默认会使用以下BasicFetch策略去从"queue:#{q}"中取任务

# lib/sidekiq/fetch.rb
module Sidekiq
  class BasicFetch
    # We want the fetch operation to timeout every few seconds so the thread
    # can check if the process is shutting down.
    TIMEOUT = 2

    def initialize(options)
      @strictly_ordered_queues = !!options[:strict]
      @queues = options[:queues].map { |q| "queue:#{q}" }
      if @strictly_ordered_queues
        @queues = @queues.uniq
        @queues << TIMEOUT
      end
    end

    def retrieve_work
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
      UnitOfWork.new(*work) if work
    end

    # Creating the Redis#brpop command takes into account any
    # configured queue weights. By default Redis#brpop returns
    # data from the first queue that has pending elements. We
    # recreate the queue command each time we invoke Redis#brpop
    # to honor weights and avoid queue starvation.
    def queues_cmd
      if @strictly_ordered_queues
        @queues
      else
        queues = @queues.shuffle.uniq
        queues << TIMEOUT
        queues
      end
    end

一个Processor就是一个线程,线程的多少根据启动参数(也可以是配置文件里的)concurrency而定。(多进程是企业版才有)

module Sidekiq
  class Manager
    include Util

    attr_reader :workers
    attr_reader :options

    def initialize(options={})
      logger.debug { options.inspect }
      @options = options
      @count = options[:concurrency] || 25
      raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

      @done = false
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end

失败重试,丢到'retry'这个sorted set中。默认最多25次。每次时间间隔都会加长。

# lib/sidekiq/job_retry.rb
def initialize(options = {})
  @max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end

def attempt_retry(worker, msg, queue, exception)
  max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)

  msg['queue'] = if msg['retry_queue']
    msg['retry_queue']
  else
    queue
  end

  # ....

  count = if msg['retry_count']
    msg['retried_at'] = Time.now.to_f
    msg['retry_count'] += 1
  else
    msg['failed_at'] = Time.now.to_f
    msg['retry_count'] = 0
  end

  # ....

  if count < max_retry_attempts
    delay = delay_for(worker, count, exception)
    logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
    retry_at = Time.now.to_f + delay
    payload = Sidekiq.dump_json(msg)
    Sidekiq.redis do |conn|
      conn.zadd('retry', retry_at.to_s, payload)
    end
  else
    # Goodbye dear message, you (re)tried your best I'm sure.
    retries_exhausted(worker, msg, exception)
  end
end

def delay_for(worker, count, exception)
  if worker && worker.sidekiq_retry_in_block
    custom_retry_in = retry_in(worker, count, exception).to_i
    return custom_retry_in if custom_retry_in > 0
  end
  seconds_to_delay(count)
end

# delayed_job uses the same basic formula
def seconds_to_delay(count)
  (count ** 4) + 15 + (rand(30)*(count+1))
end

retry和schedule这两个sorted set里的任务由Sidekiq::Scheduled::Poller取出,丢到对应queue

module Sidekiq
  module Scheduled
    SETS = %w(retry schedule)

    class Enq
      def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
        Sidekiq.redis do |conn|
          sorted_sets.each do |sorted_set|
            while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
              if conn.zrem(sorted_set, job)
                Sidekiq::Client.push(Sidekiq.load_json(job))
                Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
              end
            end
          end
        end
      end
    end

    class Poller
      include Util

      INITIAL_WAIT = 10

      def initialize
        @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
        @sleeper = ConnectionPool::TimedStack.new
        @done = false
        @thread = nil
      end

      def start
        @thread ||= safe_thread("scheduler") do
          initial_wait

          while !@done
            enqueue
            wait
          end
          Sidekiq.logger.info("Scheduler exiting...")
        end
      end

      def enqueue
        begin
          @enq.enqueue_jobs
        rescue => ex
          # Most likely a problem with redis networking.
          # Punt and try again at the next interval
          logger.error ex.message
          handle_exception(ex)
        end
      end