TL;DR

  • Sidekiq::Laucher#run触发Sidekiq::Scheduled::Poller#startSidekiq::Manager#startheartbeat
  • Sidekiq::Manager根据配置的concurrency数(默认10),生成一些Sidekiq::Processor,并在start中调用它们的start
  • Sidekiq::Processor#start会创建一个线程,该线程process_one until @done
  • Sidekiq::Processor#process_one会先根据队列优先级配置,从redis取出一个任务来执行
  • Sidekiq::Scheduled::Poller#start会创建一个线程,循环地从retryschedule这两个sorted set中取出早于当前时间的任务,放入对应的队列中
GitDiagram

graph TB
    %% Define styles
    classDef application fill:#f9f,stroke:#333,stroke-width:2px
    classDef client fill:#bbf,stroke:#333,stroke-width:2px
    classDef storage fill:#f66,stroke:#333,stroke-width:2px
    classDef processing fill:#bfb,stroke:#333,stroke-width:2px
    classDef webui fill:#d9f,stroke:#333,stroke-width:2px
    classDef integration fill:#ff9,stroke:#333,stroke-width:2px

    %% Application Layer
    subgraph Application
        App[Ruby/Rails Application]
        Rails[Rails Integration]:::integration
        AJ[ActiveJob Adapter]:::integration
    end

    %% Client Layer
    subgraph ClientLayer
        Client[Job Client]:::client
        JobImpl[Core Job Implementation]:::client
        Middleware[Middleware Chain]:::client
    end

    %% Storage Layer
    subgraph StorageLayer
        Redis[(Redis Storage)]:::storage
        RedisConn[Redis Connection]:::storage
    end

    %% Processing Layer
    subgraph ProcessingLayer
        Launch[Launcher]:::processing
        Manager[Manager]:::processing
        ProcPool[Processor Pool]:::processing
        Scheduler[Scheduler]:::processing
        Fetcher[Job Fetcher]:::processing
        RetryHandler[Retry Handler]:::processing
        DeadJobs[Dead Job Handler]:::processing
        Logger[Job Logger]:::processing
        Metrics[Metrics System]:::processing
        Capsule[Capsule System]:::processing
    end

    %% Web Interface Layer
    subgraph WebInterface
        WebUI[Web UI Interface]:::webui
    end

    %% Relationships
    App --> Rails
    App --> AJ
    Rails --> Client
    AJ --> Client
    Client --> JobImpl
    Client --> Middleware
    Middleware --> RedisConn
    RedisConn --> Redis
    Launch --> Manager
    Manager --> ProcPool
    Manager --> Scheduler
    Manager --> Fetcher
    Fetcher --> Redis
    ProcPool --> RetryHandler
    RetryHandler --> Redis
    RetryHandler --> DeadJobs
    ProcPool --> Logger
    ProcPool --> Metrics
    Capsule --> ProcPool
    Redis --> WebUI

    %% Click events
    click Client "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/client.rb"
    click ProcPool "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/processor.rb"
    click RedisConn "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/redis_connection.rb"
    click WebUI "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/web.rb"
    click Rails "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/rails.rb"
    click Manager "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/manager.rb"
    click Scheduler "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/scheduled.rb"
    click Fetcher "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/fetch.rb"
    click RetryHandler "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_retry.rb"
    click DeadJobs "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_retry.rb"
    click Middleware "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/middleware/chain.rb"
    click Metrics "https://github.com/sidekiq/sidekiq/tree/main/lib/sidekiq/metrics"
    click Capsule "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/capsule.rb"
    click Logger "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job_logger.rb"
    click JobImpl "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/job.rb"
    click Launch "https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/launcher.rb"

    %% Legend
    subgraph Legend
        L1[Application Components]:::application
        L2[Client Components]:::client
        L3[Storage Components]:::storage
        L4[Processing Components]:::processing
        L5[Web UI Components]:::webui
        L6[Integration Components]:::integration
    end

检查perform的调用栈

参考 [[sidekiq环境]],在perform中打印调用栈

 require "sidekiq"

 class PlainOldRuby
   include Sidekiq::Job

   def perform(how_hard = "super hard", how_long = 1)
+    pp caller
     sleep how_long
     puts "Workin' #{how_hard}"
   end
 end


[".../sidekiq-6.5.7/lib/sidekiq/processor.rb:202:in `execute_job'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:170:in `block (2 levels) in process'",
 ".../sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb:172:in `invoke'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:169:in `block in process'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:136:in `block (6 levels) in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/job_retry.rb:113:in `local'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:135:in `block (5 levels) in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq.rb:44:in `block in '",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:131:in `block (4 levels) in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:263:in `stats'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:126:in `block (3 levels) in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/job_logger.rb:13:in `call'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:125:in `block (2 levels) in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/job_retry.rb:80:in `global'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:124:in `block in dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/job_logger.rb:39:in `prepare'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:123:in `dispatch'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:168:in `process'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:78:in `process_one'",
 ".../sidekiq-6.5.7/lib/sidekiq/processor.rb:68:in `run'",
 ".../sidekiq-6.5.7/lib/sidekiq/component.rb:8:in `watchdog'",
 ".../sidekiq-6.5.7/lib/sidekiq/component.rb:17:in `block in safe_thread'"]

队列优先级

队列可以设置优先级

命令行方式:

# 带优先级
sidekiq -q critical,2 -q default

# 不带优先级
sidekiq -q critical -q default -q low

配置文件方式:

# 带优先级
:queues:
  - [critical, 2]
  - default

# 不带优先级
:queues:
  - critical
  - default
  - low

当检测到配置任何一个队列有优先级且大于1,则设置“严格排序”为false;否则还要将队列名按优先级数复制出多份

# sidekiq-6.5.7/lib/sidekiq/cli.rb
[weight.to_i, 1].max.times { opts[:queues] << queue.to_s }
opts[:strict] = false if weight.to_i > 0

从redis取任务时,如果是严格排序,则直接执行brpop queue:a queue:b 2;否则先将队列名集合洗牌并去重,使高优先级的队列有更大机会排到参数列表的前面,然后再提交brpop

# sidekiq-6.5.7/lib/sidekiq/fetch.rb
module Sidekiq
  class BasicFetch
    def initialize(config)
      # ..
      @strictly_ordered_queues = !!@config[:strict]
      @queues = @config[:queues].map { |q| "queue:#{q}" }
      if @strictly_ordered_queues
        @queues.uniq!
        @queues << {timeout: TIMEOUT}
      end
    end

    def retrieve_work
      qs = queues_cmd
      # ...
      queue, job = redis { |conn| conn.brpop(*qs) }
      UnitOfWork.new(queue, job, config) if queue
    end

    def queues_cmd
      if @strictly_ordered_queues
        @queues
      else
        permute = @queues.shuffle
        permute.uniq!
        permute << {timeout: TIMEOUT}
        permute
      end
    end
  end
end

任务处理流程

Sidekiq::Processor#start会创建一个线程去调runprocess_one until @done)。

process_one会根据队列优先级配置,从redis取出一个任务,然后调用process

# sidekiq-6.5.7/lib/sidekiq/processor.rb
def process(uow)
  jobstr = uow.job
  queue = uow.queue_name

  job_hash = nil
  begin
    job_hash = Sidekiq.load_json(jobstr)
  rescue => ex
    # ...
    now = Time.now.to_f
    config.redis.zadd("dead", now.to_s, jobstr)
    # ...
  end

  begin
    # 找回这个任务的Sidekiq::Job子类,并将其实例化
    dispatch(job_hash, queue, jobstr) do |inst|
      # 执行server_middleware
      @config.server_middleware.invoke(inst, job_hash, queue) do
        # 传递参数给Sidekiq::Job子类对象的perform方法
        execute_job(inst, job_hash["args"])
      end
    end
    # ...
  rescue Exception => ex
    # ...
    raise ex
  ensure
    # ...
  end
end

def execute_job(inst, cloned_args)
  inst.perform(*cloned_args)
end

def dispatch(job_hash, queue, jobstr)
  # 不明
  @job_logger.prepare(job_hash) do
    # 捕捉错误并加入重试队列
    @retrier.global(jobstr, queue) do
      # 在任务执行前后打印日志
      @job_logger.call(job_hash, queue) do
        # 记录当前进程的任务执行次数(PROCESSED)、任务失败次数(FAILURE)、当前任务(WORK_STATE)
        # 会有别的线程(heartbeat)将其写入redis
        stats(jobstr, queue) do
          # 可用:reloader选项传入热加载器,使其在执行任务前加载最新代码,重连数据库等
          @reloader.call do
            # 找回这个任务的Sidekiq::Job子类,并将其实例化
            klass = constantize(job_hash["class"])
            inst = klass.new
            inst.jid = job_hash["jid"]
            @retrier.local(inst, jobstr, queue) do
              # 回到process方法里的那个block
              yield inst
            end
          end
        end
      end
    end
  end
end

重试与定时

因为重试也是有时间间隔的,与定时任务逻辑类似,所以都放在了Sidekiq::Scheduled这个module中

Sidekiq::Scheduled::Poller#start会创建一个线程,不断地执行Sidekiq::Scheduled::Enq#enqueue_jobs

# sidekiq-6.5.7/lib/sidekiq/scheduled.rb
module Sidekiq
  module Scheduled
    class Poller
      include Sidekiq::Component

      def initialize(options)
        @enq = (options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
        # ...
      end

      def start
        @thread ||= safe_thread("scheduler") {
          # ...
          until @done
            enqueue
            wait
          end
        }
      end

      def enqueue
        @enq.enqueue_jobs
        # ...
      end
    end
  end
end

Sidekiq::Scheduled::Enq则是利用lua脚本,返回retryschedule两个sorted set中,预定执行时间早于当前时间的任务,然后塞入对应的queue:xxx

# sidekiq-6.5.7/lib/sidekiq/scheduled.rb
module Sidekiq
  module Scheduled
    SETS = %w[retry schedule]

    class Enq
      LUA_ZPOPBYSCORE = <<~LUA
        local key, now = KEYS[1], ARGV[1]
        local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
        if jobs[1] then
          redis.call("zrem", key, jobs[1])
          return jobs[1]
        end
      LUA

      def initialize
        @done = false
        @lua_zpopbyscore_sha = nil
      end

      def enqueue_jobs(sorted_sets = SETS)
        Sidekiq.redis do |conn|
          sorted_sets.each do |sorted_set|
            while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
              Sidekiq::Client.push(Sidekiq.load_json(job))
              Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
            end
          end
        end
      end

      private

      def zpopbyscore(conn, keys: nil, argv: nil)
        if @lua_zpopbyscore_sha.nil?
          raw_conn = conn.respond_to?(:redis) ? conn.redis : conn
          @lua_zpopbyscore_sha = raw_conn.script(:load, LUA_ZPOPBYSCORE)
        end

        conn.evalsha(@lua_zpopbyscore_sha, keys, argv)
      rescue RedisConnection.adapter::CommandError => e
        # ...
        retry
      end
    end
  end
end

heartbeat

heartbeat线程每隔5秒将运行状态写入redis

module Sidekiq
  class Launcher
    include Sidekiq::Component

    STATS_TTL = 5 * 365 * 24 * 60 * 60 # 5 years

    def run
      @thread = safe_thread("heartbeat", &method(:start_heartbeat))
      # ...
    end

    BEAT_PAUSE = 5

    def start_heartbeat
      loop do
        heartbeat
        sleep BEAT_PAUSE
      end
      logger.info("Heartbeat stopping...")
    end

    def heartbeat
      # ...
      ❤
    end

    def ❤
      key = identity
      fails = procd = 0

      begin
        fails = Processor::FAILURE.reset
        procd = Processor::PROCESSED.reset
        curstate = Processor::WORK_STATE.dup

        nowdate = Time.now.utc.strftime("%Y-%m-%d")

        redis do |conn|
          # 记录总成功数、每日成功数、总失败数、每日失败数
          # 每日计数保留五年
          conn.multi do |transaction|
            transaction.incrby("stat:processed", procd)
            transaction.incrby("stat:processed:#{nowdate}", procd)
            transaction.expire("stat:processed:#{nowdate}", STATS_TTL)

            transaction.incrby("stat:failed", fails)
            transaction.incrby("stat:failed:#{nowdate}", fails)
            transaction.expire("stat:failed:#{nowdate}", STATS_TTL)
          end

          # 在"#{host+pid+rand}:work"这个hash中
          # 记录当前进程正在运行的线程(key)及其执行的任务(value)
          # 过期时间60秒
          work_key = "#{key}:work"
          conn.pipelined do |transaction|
            transaction.unlink(work_key)
            curstate.each_pair do |tid, hash|
              transaction.hset(work_key, tid, Sidekiq.dump_json(hash))
            end
            transaction.expire(work_key, 60)
          end
        end

        rtt = check_rtt

        fails = procd = 0
        kb = memory_usage(::Process.pid)

        # 在processes这个set中记录host+pid+rand
        # 并在host+pid+rand这个hash中,记录执行中的任务数(busy)、心跳时间(beat)、
        # 响应时间(rtt_us)、是否退出(quiet)、读取/proc所知的内存占用(rss)
        _, exists, _, _, msg = redis { |conn|
          conn.multi { |transaction|
            transaction.sadd("processes", [key])
            transaction.exists?(key)
            transaction.hmset(key, "info", to_json,
              "busy", curstate.size,
              "beat", Time.now.to_f,
              "rtt_us", rtt,
              "quiet", @done.to_s,
              "rss", kb)
            transaction.expire(key, 60)
            transaction.rpop("#{key}-signals")
          }
        }

        # first heartbeat or recovering from an outage and need to reestablish our heartbeat
        fire_event(:heartbeat) unless exists
        fire_event(:beat, oneshot: false)

        return unless msg

        ::Process.kill(msg, ::Process.pid)
      rescue => e
        # ignore all redis/network issues
        logger.error("heartbeat: #{e}")
        # don't lose the counts if there was a network issue
        Processor::PROCESSED.incr(procd)
        Processor::FAILURE.incr(fails)
      end
    end
  end
end

值示例如下

进程总体状态

> hgetall z-VirtualBox:3952:c200d7ce2ae0
> 1) "rtt_us"
> 2) "266"
> 3) "quiet"
> 4) "false"
> 5) "info"
> 6) "{\"hostname\":\"z-VirtualBox\",\"started_at\":1666511019.355878,\"pid\":3952,\"tag\":\"\",\"concurrency\":5,\"queues\":[\"misc\",\"big\"],\"labels\":[],\"identity\":\"z-VirtualBox:3952:c200d7ce2ae0\"}"
> 7) "beat"
> 8) "1666513260.3950865"
> 9) "busy"
> 10) "3"
> 11) "rss"
> 12) "42720"

进程执行中的任务

> hgetall z-VirtualBox:3952:c200d7ce2ae0:work
> 1) "200"
> 2) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big0\\\",\\\"jid\\\":\\\"63a64a7a42fe7f44c9196f11\\\",\\\"created_at\\\":1666513524.5409067,\\\"enqueued_at\\\":1666513524.5409853}\",\"run_at\":1666513524}"
> 3) "218"
> 4) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big1\\\",\\\"jid\\\":\\\"9a60f9bf3d97fd3f79ee4cd6\\\",\\\"created_at\\\":1666513521.5368056,\\\"enqueued_at\\\":1666513521.5368824}\",\"run_at\":1666513521}"
> 5) "228"
> 6) "{\"queue\":\"misc\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"misc\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Misc0\\\",\\\"jid\\\":\\\"3802c6c4369a0ce00553d2cc\\\",\\\"created_at\\\":1666513525.5429494,\\\"enqueued_at\\\":1666513525.5430243}\",\"run_at\":1666513525}"
> 7) "21w"
> 8) "{\"queue\":\"big\",\"payload\":\"{\\\"retry\\\":true,\\\"queue\\\":\\\"big\\\",\\\"args\\\":[],\\\"class\\\":\\\"Jobs::Big0\\\",\\\"jid\\\":\\\"c4bfb708f1063e32f920942b\\\",\\\"created_at\\\":1666513522.5381534,\\\"enqueued_at\\\":1666513522.5382252}\",\"run_at\":1666513522}"