sidekiq的api
在lib/sidekiq/api.rb中,使用时可能需要先
require 'sidekiq/api'
Stats 所有即时延时重试队列长度和执行统计
例如
# 所有即时队列的名称和长度
Sidekiq::Stats.new.queues
# 其他
Sidekiq::Stats.new.instance_variable_get(:@stats)
源码
class Stats
def initialize
fetch_stats_fast!
end
# 返回每个队列的当前长度,格式 {queue1: 213, queue2: 698 ...}
def queues
Sidekiq::Stats::Queues.new.lengths
end
def fetch_stats_fast!
pipe1_res = Sidekiq.redis { |conn|
conn.pipelined do |pipeline|
# 总执行次数
pipeline.get("stat:processed")
# 总失败次数
pipeline.get("stat:failed")
# 未到时候的定时任务数量
pipeline.zcard("schedule")
# 等待重试的任务数量
pipeline.zcard("retry")
# dead集合大小
pipeline.zcard("dead")
# sidekiq进程数
pipeline.scard("processes")
# default队列的延迟
pipeline.lrange("queue:default", -1, -1)
end
}
default_queue_latency = if (entry = pipe1_res[6].first)
job = begin
Sidekiq.load_json(entry)
rescue
{}
end
now = Time.now.to_f
thence = job["enqueued_at"] || now
now - thence
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
failed: pipe1_res[1].to_i,
scheduled_size: pipe1_res[2],
retry_size: pipe1_res[3],
dead_size: pipe1_res[4],
processes_size: pipe1_res[5],
default_queue_latency: default_queue_latency
}
end
# O(number of processes + number of queues) redis calls
# @api private
def fetch_stats_slow!
processes = Sidekiq.redis { |conn|
conn.sscan_each("processes").to_a
}
queues = Sidekiq.redis { |conn|
conn.sscan_each("queues").to_a
}
pipe2_res = Sidekiq.redis { |conn|
conn.pipelined do |pipeline|
# 每个进程执行中的任务数
processes.each { |key| pipeline.hget(key, "busy") }
# 每个队列的长度
queues.each { |queue| pipeline.llen("queue:#{queue}") }
end
}
s = processes.size
workers_size = pipe2_res[0...s].sum(&:to_i)
enqueued = pipe2_res[s..-1].sum(&:to_i)
# 所有进程里执行中的任务数
@stats[:workers_size] = workers_size
# 就绪任务总数
@stats[:enqueued] = enqueued
@stats
end
# 将stat:processed、stat:failed重置为零
def reset(*stats)
all = %w[failed processed]
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
stats.each do |stat|
mset_args << "stat:#{stat}"
mset_args << 0
end
Sidekiq.redis do |conn|
conn.mset(*mset_args)
end
end
class Queues
# 返回每个队列的当前长度,格式 {queue1: 213, queue2: 698 ...}
def lengths
Sidekiq.redis do |conn|
queues = conn.sscan_each("queues").to_a
lengths = conn.pipelined { |pipeline|
queues.each do |queue|
pipeline.llen("queue:#{queue}")
end
}
array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
array_of_arrays.to_h
end
end
end
# 获取每日执行数和失败数
class History
# 取start_date的days_previous前的记录
def initialize(days_previous, start_date = nil)
# we only store five years of data in Redis
raise ArgumentError if days_previous < 1 || days_previous > (5 * 365)
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
def processed
@processed ||= date_stat_hash("processed")
end
def failed
@failed ||= date_stat_hash("failed")
end
private
# 使用mget一次获取多个string对应的值
# string是各个日期:stat:processed:2022-10-03、stat:processed:2022-10-02 ……
# 或者stat:failed:2022-10-03
def date_stat_hash(stat)
stat_hash = {}
dates = @start_date.downto(@start_date - @days_previous + 1).map { |date|
date.strftime("%Y-%m-%d")
}
keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" }
begin
Sidekiq.redis do |conn|
conn.mget(keys).each_with_index do |value, idx|
stat_hash[dates[idx]] = value ? value.to_i : 0
end
end
rescue RedisConnection.adapter::CommandError
# mget will trigger a CROSSSLOT error when run against a Cluster
# TODO Someone want to add Cluster support?
end
stat_hash
end
end
end
Queue 遍历或清空某个即时队列
例如
# 获取某个队列
Sidekiq::Queue.new('user_price_audit')
# 遍历,返回JobRecord
Sidekiq::Queue.new('user_price_audit').each{ ... }
源码
class Queue
include Enumerable
# 使用sscan迭代queues这个set
# 并包装成Queue返回
def self.all
Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
# 创建一个Queue对象,对应redis中是queue:xxx
def initialize(name = "default")
@name = name.to_s
@rname = "queue:#{name}"
end
# 使用llen检查这个队列的长度
def size
Sidekiq.redis { |con| con.llen(@rname) }
end
# 使用lrange取队列第一个元素(因为入队是lpush,所以取最右)
# 以Time.now减其enqueued_at得延迟
def latency
entry = Sidekiq.redis { |conn|
conn.lrange(@rname, -1, -1)
}.first
return 0 unless entry
job = Sidekiq.load_json(entry)
now = Time.now.to_f
thence = job["enqueued_at"] || now
now - thence
end
# 以50个元素一页的方式,用lrange从左往右读取队列
# 但因其它线程会有入队出队操作,所以不一定连续,它计算deleted_size可能也没用
def each
initial_size = size
deleted_size = 0
page = 0
page_size = 50
loop do
range_start = page * page_size - deleted_size
range_end = range_start + page_size - 1
entries = Sidekiq.redis { |conn|
conn.lrange @rname, range_start, range_end
}
break if entries.empty?
page += 1
entries.each do |entry|
yield JobRecord.new(entry, @name)
end
deleted_size = initial_size - size
end
end
# 使用Enumerable提供的detect->each一个个匹配jid,效率低
def find_job(jid)
detect { |j| j.jid == jid }
end
# 使用unlink非阻塞地删除本队列(而del是阻塞的)
# 并且从queues这个set中提出其名字
def clear
Sidekiq.redis do |conn|
conn.multi do |transaction|
transaction.unlink(@rname)
transaction.srem("queues", [name])
end
end
true
end
end
JobRecord 任务对象
class JobRecord
attr_reader :value # redis中原始的json字符串
attr_reader :item # json解析成hash
attr_reader :queue
def initialize(item, queue_name = nil)
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
@queue = queue_name || @item["queue"]
end
# 当前时间与入队时间的差值
def latency
now = Time.now.to_f
now - (@item["enqueued_at"] || @item["created_at"] || now)
end
# 从Queue取到一个JobRecord后,可以调用此方法方便地删除它
def delete
count = Sidekiq.redis { |conn|
conn.lrem("queue:#{@queue}", 1, @value)
}
count != 0
end
end
SortedSet
就给
JobSet
继承而已class SortedSet
include Enumerable
attr_reader :name
def initialize(name)
@name = name
@_size = size
end
# 返回sorted set的大小
def size
Sidekiq.redis { |c| c.zcard(name) }
end
# 遍历sorted set中的元素,并包装成SortedEntry返回
def scan(match, count = 100)
return to_enum(:scan, match, count) unless block_given?
match = "*#{match}*" unless match.include?("*")
Sidekiq.redis do |conn|
conn.zscan_each(name, match: match, count: count) do |entry, score|
yield SortedEntry.new(self, score, entry)
end
end
end
# 非阻塞地移除这个sorted set
def clear
Sidekiq.redis do |conn|
conn.unlink(name)
end
true
end
alias_method :💣, :clear
end
JobSet < SortedSet
“存放任务的sorted set”的基类,用于
schedule
、retry
、dead
提供
each
方法以满足Enumerable
class JobSet < SortedSet
def each
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
loop do
range_start = page * page_size + offset_size
range_end = range_start + page_size - 1
elements = Sidekiq.redis { |conn|
conn.zrange name, range_start, range_end, withscores: true
}
break if elements.empty?
page -= 1
elements.reverse_each do |element, score|
yield SortedEntry.new(self, score, element)
end
offset_size = initial_size - @_size
end
end
end
ScheduledSet < JobSet < SortedSet 延时队列
例如
# 总数
Sidekiq::ScheduledSet.new.count
# 遍历,返回SortedEntry < JobRecord
Sidekiq::ScheduledSet.new.each{...}
# 取某个延时任务看看
Sidekiq::ScheduledSet.new.first #.item['args']
# 删除某种延时job
Sidekiq::ScheduledSet.new.each{|j| j.kill if j.item.dig('args', 0, 'job_class') == 'SuperPricesAuditor'}
RetrySet < JobSet < SortedSet 重试队列
例如
# 总数
Sidekiq::RetrySet.new.count
# 批量操作
Sidekiq::RetrySet.new.retry_all
Sidekiq::RetrySet.new.kill_all
# 遍历,返回SortedEntry < JobRecord
Sidekiq::RetrySet.new.each{...}
# 取某个重试任务看看
Sidekiq::RetrySet.new.first #.item['args']
# 删除某种重试job
Sidekiq::RetrySet.new.each{|j| j.kill if j.item.dig('args', 0, 'job_class') == 'SuperPricesAuditor'}
源码
class RetrySet < JobSet
def initialize
super "retry"
end
# Enqueues all jobs pending within the retry set.
def retry_all
each(&:retry) while size > 0
end
# Kills all jobs pending within the retry set.
def kill_all
each(&:kill) while size > 0
end
end
WorkSet
查看运行中的任务
class WorkSet
include Enumerable
# 从processes这个set中得所有sidekiq实例id:host+pid+rand
# 再从每个"#{host+pid+rand}:work"的hash中获取key(线程id)和value(任务)
def each(&block)
results = []
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
procs.sort.each do |key|
valid, workers = conn.pipelined { |pipeline|
pipeline.exists?(key)
pipeline.hgetall("#{key}:work")
}
next unless valid
workers.each_pair do |tid, json|
hsh = Sidekiq.load_json(json)
p = hsh["payload"]
# avoid breaking API, this is a side effect of the JSON optimization in #4316
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
results << [key, tid, hsh]
end
end
end
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end
# 取每个"#{host+pid+rand}"的hash里的busy项,加总
# "#{host+pid+rand}"是heartbeat定期写入的,不太实时,但内容少,比上面的快
def size
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
if procs.empty?
0
else
conn.pipelined { |pipeline|
procs.each do |key|
pipeline.hget(key, "busy")
end
}.sum(&:to_i)
end
end
end
end
Paginator
根据第一个参数key(可能是
"queue:#{@name}"
、"dead"
、"retry"
、"schedule"
)所指的value的类型(可能是zset、list),采用不同的redis指令来切分集合module Sidekiq
module Paginator
def page(key, pageidx = 1, page_size = 25, opts = nil)
current_page = pageidx.to_i < 1 ? 1 : pageidx.to_i
pageidx = current_page - 1
total_size = 0
items = []
starting = pageidx * page_size
ending = starting + page_size - 1
Sidekiq.redis do |conn|
type = conn.type(key)
rev = opts && opts[:reverse]
case type
when "zset"
total_size, items = conn.multi { |transaction|
transaction.zcard(key)
if rev
transaction.zrevrange(key, starting, ending, withscores: true)
else
transaction.zrange(key, starting, ending, withscores: true)
end
}
[current_page, total_size, items]
when "list"
total_size, items = conn.multi { |transaction|
transaction.llen(key)
if rev
transaction.lrange(key, -ending - 1, -starting - 1)
else
transaction.lrange(key, starting, ending)
end
}
items.reverse! if rev
[current_page, total_size, items]
when "none"
[1, 0, []]
else
raise "can't page a #{type}"
end
end
end
end
end
以下是一个取schedule任务的例子:
PAGINATOR = Class.new.tap{ |k| k.include ::Sidekiq::Paginator }.new
current_page, total_size, retries = PAGINATOR.page("schedule", 1, 1000)
retries.each do |msg, score|
se = Sidekiq::SortedEntry.new(nil, score, msg)
if se.item.dig('args', 0, 'job_class') == 'Price::DeleteSystemJob' &&
se.item.dig('args', 0, 'arguments', 0, 'customer_firm_id') == 27735
# ...
end
end
ProcessSet
所有进程的概览:总进程数,总线程并发数,总内存占用。可迭代每个进程
class ProcessSet
include Enumerable
# 包装成Process对象,迭代
def each
result = Sidekiq.redis { |conn|
procs = conn.sscan_each("processes").to_a.sort
conn.pipelined do |pipeline|
procs.each do |key|
pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
end
end
}
result.each do |info, busy, at_s, quiet, rss, rtt|
next if info.nil? 949
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge("busy" => busy.to_i,
"beat" => at_s.to_f,
"quiet" => quiet,
"rss" => rss.to_i,
"rtt_us" => rtt.to_i))
end
end
# 进程数
def size
Sidekiq.redis { |conn| conn.scard("processes") }
end
# 取每个sidekiq进程配置的线程数,加总
def total_concurrency
sum { |x| x["concurrency"].to_i }
end
# 心跳写入的进程内存占用量,加总
def total_rss_in_kb
sum { |x| x["rss"].to_i }
end
alias_method :total_rss, :total_rss_in_kb
end