在lib/sidekiq/api.rb中,使用时可能需要先require 'sidekiq/api'

可参考 All you need to know about Sidekiq | by Shashwat Srivastava | Medium

Stats 所有即时延时重试队列长度和执行统计

例如

# 所有即时队列的名称和长度
Sidekiq::Stats.new.queues

# 其他
Sidekiq::Stats.new.instance_variable_get(:@stats)

源码

class Stats
  def initialize
    fetch_stats_fast!
  end

  # 返回每个队列的当前长度,格式 {queue1: 213, queue2: 698 ...}
  def queues
    Sidekiq::Stats::Queues.new.lengths
  end

  def fetch_stats_fast!
    pipe1_res = Sidekiq.redis { |conn|
      conn.pipelined do |pipeline|
        # 总执行次数
        pipeline.get("stat:processed")
        # 总失败次数
        pipeline.get("stat:failed")
        # 未到时候的定时任务数量
        pipeline.zcard("schedule")
        # 等待重试的任务数量
        pipeline.zcard("retry")
        # dead集合大小
        pipeline.zcard("dead")
        # sidekiq进程数
        pipeline.scard("processes")
        # default队列的延迟
        pipeline.lrange("queue:default", -1, -1)
      end
    }

    default_queue_latency = if (entry = pipe1_res[6].first)
      job = begin
        Sidekiq.load_json(entry)
      rescue
        {}
      end
      now = Time.now.to_f
      thence = job["enqueued_at"] || now
      now - thence
    else
      0
    end

    @stats = {
      processed: pipe1_res[0].to_i,
      failed: pipe1_res[1].to_i,
      scheduled_size: pipe1_res[2],
      retry_size: pipe1_res[3],
      dead_size: pipe1_res[4],
      processes_size: pipe1_res[5],

      default_queue_latency: default_queue_latency
    }
  end

  # O(number of processes + number of queues) redis calls
  # @api private
  def fetch_stats_slow!
    processes = Sidekiq.redis { |conn|
      conn.sscan_each("processes").to_a
    }

    queues = Sidekiq.redis { |conn|
      conn.sscan_each("queues").to_a
    }

    pipe2_res = Sidekiq.redis { |conn|
      conn.pipelined do |pipeline|
        # 每个进程执行中的任务数
        processes.each { |key| pipeline.hget(key, "busy") }
        # 每个队列的长度
        queues.each { |queue| pipeline.llen("queue:#{queue}") }
      end
    }

    s = processes.size
    workers_size = pipe2_res[0...s].sum(&:to_i)
    enqueued = pipe2_res[s..-1].sum(&:to_i)

    # 所有进程里执行中的任务数
    @stats[:workers_size] = workers_size
    # 就绪任务总数
    @stats[:enqueued] = enqueued
    @stats
  end

  # 将stat:processed、stat:failed重置为零
  def reset(*stats)
    all = %w[failed processed]
    stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)

    mset_args = []
    stats.each do |stat|
      mset_args << "stat:#{stat}"
      mset_args << 0
    end
    Sidekiq.redis do |conn|
      conn.mset(*mset_args)
    end
  end

  class Queues
    # 返回每个队列的当前长度,格式 {queue1: 213, queue2: 698 ...}
    def lengths
      Sidekiq.redis do |conn|
        queues = conn.sscan_each("queues").to_a

        lengths = conn.pipelined { |pipeline|
          queues.each do |queue|
            pipeline.llen("queue:#{queue}")
          end
        }

        array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
        array_of_arrays.to_h
      end
    end
  end

  # 获取每日执行数和失败数
  class History
    # 取start_date的days_previous前的记录
    def initialize(days_previous, start_date = nil)
      # we only store five years of data in Redis
      raise ArgumentError if days_previous < 1 || days_previous > (5 * 365)
      @days_previous = days_previous
      @start_date = start_date || Time.now.utc.to_date
    end

    def processed
      @processed ||= date_stat_hash("processed")
    end

    def failed
      @failed ||= date_stat_hash("failed")
    end

    private

    # 使用mget一次获取多个string对应的值
    # string是各个日期:stat:processed:2022-10-03、stat:processed:2022-10-02 ……
    # 或者stat:failed:2022-10-03
    def date_stat_hash(stat)
      stat_hash = {}
      dates = @start_date.downto(@start_date - @days_previous + 1).map { |date|
        date.strftime("%Y-%m-%d")
      }

      keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" }

      begin
        Sidekiq.redis do |conn|
          conn.mget(keys).each_with_index do |value, idx|
            stat_hash[dates[idx]] = value ? value.to_i : 0
          end
        end
      rescue RedisConnection.adapter::CommandError
        # mget will trigger a CROSSSLOT error when run against a Cluster
        # TODO Someone want to add Cluster support?
      end

      stat_hash
    end
  end
end

Queue 遍历或清空某个即时队列

例如

# 获取某个队列
Sidekiq::Queue.new('user_price_audit')

# 遍历,返回JobRecord
Sidekiq::Queue.new('user_price_audit').each{ ... }

源码

class Queue
  include Enumerable

  # 使用sscan迭代queues这个set
  # 并包装成Queue返回
  def self.all
    Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
  end

  attr_reader :name

  # 创建一个Queue对象,对应redis中是queue:xxx
  def initialize(name = "default")
    @name = name.to_s
    @rname = "queue:#{name}"
  end

  # 使用llen检查这个队列的长度
  def size
    Sidekiq.redis { |con| con.llen(@rname) }
  end

  # 使用lrange取队列第一个元素(因为入队是lpush,所以取最右)
  # 以Time.now减其enqueued_at得延迟
  def latency
    entry = Sidekiq.redis { |conn|
      conn.lrange(@rname, -1, -1)
    }.first
    return 0 unless entry
    job = Sidekiq.load_json(entry)
    now = Time.now.to_f
    thence = job["enqueued_at"] || now
    now - thence
  end

  # 以50个元素一页的方式,用lrange从左往右读取队列
  # 但因其它线程会有入队出队操作,所以不一定连续,它计算deleted_size可能也没用
  def each
    initial_size = size
    deleted_size = 0
    page = 0
    page_size = 50

    loop do
      range_start = page * page_size - deleted_size
      range_end = range_start + page_size - 1
      entries = Sidekiq.redis { |conn|
        conn.lrange @rname, range_start, range_end
      }
      break if entries.empty?
      page += 1
      entries.each do |entry|
        yield JobRecord.new(entry, @name)
      end
      deleted_size = initial_size - size
    end
  end

  # 使用Enumerable提供的detect->each一个个匹配jid,效率低
  def find_job(jid)
    detect { |j| j.jid == jid }
  end

  # 使用unlink非阻塞地删除本队列(而del是阻塞的)
  # 并且从queues这个set中提出其名字
  def clear
    Sidekiq.redis do |conn|
      conn.multi do |transaction|
        transaction.unlink(@rname)
        transaction.srem("queues", [name])
      end
    end
    true
  end
end

JobRecord 任务对象

class JobRecord
  attr_reader :value # redis中原始的json字符串
  attr_reader :item # json解析成hash
  attr_reader :queue

  def initialize(item, queue_name = nil)
    @args = nil
    @value = item
    @item = item.is_a?(Hash) ? item : parse(item)
    @queue = queue_name || @item["queue"]
  end

  # 当前时间与入队时间的差值
  def latency
    now = Time.now.to_f
    now - (@item["enqueued_at"] || @item["created_at"] || now)
  end

  # 从Queue取到一个JobRecord后,可以调用此方法方便地删除它
  def delete
    count = Sidekiq.redis { |conn|
      conn.lrem("queue:#{@queue}", 1, @value)
    }
    count != 0
  end
end

SortedSet

就给JobSet继承而已

class SortedSet
  include Enumerable

  attr_reader :name

  def initialize(name)
    @name = name
    @_size = size
  end

  # 返回sorted set的大小
  def size
    Sidekiq.redis { |c| c.zcard(name) }
  end

  # 遍历sorted set中的元素,并包装成SortedEntry返回
  def scan(match, count = 100)
    return to_enum(:scan, match, count) unless block_given?

    match = "*#{match}*" unless match.include?("*")
    Sidekiq.redis do |conn|
      conn.zscan_each(name, match: match, count: count) do |entry, score|
        yield SortedEntry.new(self, score, entry)
      end
    end
  end

  # 非阻塞地移除这个sorted set
  def clear
    Sidekiq.redis do |conn|
      conn.unlink(name)
    end
    true
  end
  alias_method :💣, :clear
end

JobSet < SortedSet

“存放任务的sorted set”的基类,用于scheduleretrydead

提供each方法以满足Enumerable

class JobSet < SortedSet
  def each
    initial_size = @_size
    offset_size = 0
    page = -1
    page_size = 50

    loop do
      range_start = page * page_size + offset_size
      range_end = range_start + page_size - 1
      elements = Sidekiq.redis { |conn|
        conn.zrange name, range_start, range_end, withscores: true
      }
      break if elements.empty?
      page -= 1
      elements.reverse_each do |element, score|
        yield SortedEntry.new(self, score, element)
      end
      offset_size = initial_size - @_size
    end
  end
end

ScheduledSet < JobSet < SortedSet 延时队列

例如

# 总数
Sidekiq::ScheduledSet.new.count

# 遍历,返回SortedEntry < JobRecord
Sidekiq::ScheduledSet.new.each{...}

# 取某个延时任务看看
Sidekiq::ScheduledSet.new.first #.item['args']

# 删除某种延时job
Sidekiq::ScheduledSet.new.each{|j| j.kill if j.item.dig('args', 0, 'job_class') == 'SuperPricesAuditor'}

RetrySet < JobSet < SortedSet 重试队列

例如

# 总数
Sidekiq::RetrySet.new.count

# 批量操作
Sidekiq::RetrySet.new.retry_all
Sidekiq::RetrySet.new.kill_all

# 遍历,返回SortedEntry < JobRecord
Sidekiq::RetrySet.new.each{...}

# 取某个重试任务看看
Sidekiq::RetrySet.new.first #.item['args']

# 删除某种重试job
Sidekiq::RetrySet.new.each{|j| j.kill if j.item.dig('args', 0, 'job_class') == 'SuperPricesAuditor'}

源码

class RetrySet < JobSet
  def initialize
    super "retry"
  end

  # Enqueues all jobs pending within the retry set.
  def retry_all
    each(&:retry) while size > 0
  end

  # Kills all jobs pending within the retry set.
  def kill_all
    each(&:kill) while size > 0
  end
end

WorkSet

查看运行中的任务

class WorkSet
  include Enumerable

  # 从processes这个set中得所有sidekiq实例id:host+pid+rand
  # 再从每个"#{host+pid+rand}:work"的hash中获取key(线程id)和value(任务)
  def each(&block)
    results = []
    Sidekiq.redis do |conn|
      procs = conn.sscan_each("processes").to_a
      procs.sort.each do |key|
        valid, workers = conn.pipelined { |pipeline|
          pipeline.exists?(key)
          pipeline.hgetall("#{key}:work")
        }
        next unless valid
        workers.each_pair do |tid, json|
          hsh = Sidekiq.load_json(json)
          p = hsh["payload"]
          # avoid breaking API, this is a side effect of the JSON optimization in #4316
          hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
          results << [key, tid, hsh]
        end
      end
    end

    results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
  end

  # 取每个"#{host+pid+rand}"的hash里的busy项,加总
  # "#{host+pid+rand}"是heartbeat定期写入的,不太实时,但内容少,比上面的快
  def size
    Sidekiq.redis do |conn|
      procs = conn.sscan_each("processes").to_a
      if procs.empty?
        0
      else
        conn.pipelined { |pipeline|
          procs.each do |key|
            pipeline.hget(key, "busy")
          end
        }.sum(&:to_i)
      end
    end
  end
end

Paginator

根据第一个参数key(可能是"queue:#{@name}""dead""retry""schedule")所指的value的类型(可能是zset、list),采用不同的redis指令来切分集合

module Sidekiq
  module Paginator
    def page(key, pageidx = 1, page_size = 25, opts = nil)
      current_page = pageidx.to_i < 1 ? 1 : pageidx.to_i
      pageidx = current_page - 1
      total_size = 0
      items = []
      starting = pageidx * page_size
      ending = starting + page_size - 1

      Sidekiq.redis do |conn|
        type = conn.type(key)
        rev = opts && opts[:reverse]

        case type
        when "zset"
          total_size, items = conn.multi { |transaction|
            transaction.zcard(key)
            if rev
              transaction.zrevrange(key, starting, ending, withscores: true)
            else
              transaction.zrange(key, starting, ending, withscores: true)
            end
          }
          [current_page, total_size, items]
        when "list"
          total_size, items = conn.multi { |transaction|
            transaction.llen(key)
            if rev
              transaction.lrange(key, -ending - 1, -starting - 1)
            else
              transaction.lrange(key, starting, ending)
            end
          }
          items.reverse! if rev
          [current_page, total_size, items]
        when "none"
          [1, 0, []]
        else
          raise "can't page a #{type}"
        end
      end
    end
  end
end

以下是一个取schedule任务的例子:

PAGINATOR = Class.new.tap{ |k| k.include ::Sidekiq::Paginator }.new

current_page, total_size, retries = PAGINATOR.page("schedule", 1, 1000)

retries.each do |msg, score|
  se = Sidekiq::SortedEntry.new(nil, score, msg)
  if se.item.dig('args', 0, 'job_class') == 'Price::DeleteSystemJob' &&
    se.item.dig('args', 0, 'arguments', 0, 'customer_firm_id') == 27735
    # ...
  end
end

ProcessSet

所有进程的概览:总进程数,总线程并发数,总内存占用。可迭代每个进程

class ProcessSet
  include Enumerable

  # 包装成Process对象,迭代
  def each
    result = Sidekiq.redis { |conn|
      procs = conn.sscan_each("processes").to_a.sort
      conn.pipelined do |pipeline|
        procs.each do |key|
          pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
        end
      end
    }

    result.each do |info, busy, at_s, quiet, rss, rtt|
      next if info.nil? 949
      hash = Sidekiq.load_json(info)
      yield Process.new(hash.merge("busy" => busy.to_i,
        "beat" => at_s.to_f,
        "quiet" => quiet,
        "rss" => rss.to_i,
        "rtt_us" => rtt.to_i))
    end
  end

  # 进程数
  def size
    Sidekiq.redis { |conn| conn.scard("processes") }
  end

  # 取每个sidekiq进程配置的线程数,加总
  def total_concurrency
    sum { |x| x["concurrency"].to_i }
  end

  # 心跳写入的进程内存占用量,加总
  def total_rss_in_kb
    sum { |x| x["rss"].to_i }
  end
  alias_method :total_rss, :total_rss_in_kb
end