yabeda-sidekiq浅析
TL;DR
- 指标有采自sidekiq存于redis中的状态快照,也有通过middleware拦截记录
官网
环境搭建
在 [[sidekiq环境]] 的环境基础上,加上yabeda-sidekiq和yabeda-prometheus
require "sidekiq"
require "yabeda/sidekiq"
require "yabeda/prometheus"
Yabeda.configure!
if ENV['METRICS_SERVER']
Sidekiq.configure_server do |_config|
Yabeda::Prometheus::Exporter.start_metrics_server!
end
end
class PlainOldRuby
include Sidekiq::Job
def perform(how_hard = "super hard", how_long = 1)
sleep how_long
puts "Workin' #{how_hard}"
end
end
启动服务
METRICS_SERVER=1 sidekiq -r ./por.rb
可见sidekiq以及metrics-server都有启动
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Running in ruby 2.7.5p203 (2021-11-24 revision f69aeb8314) [x86_64-linux]
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: See LICENSE and the LGPL-3.0 for licensing details.
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Upgrade to Sidekiq Pro for more features and support: https://sidekiq.org
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Booting Sidekiq 6.5.7 with Sidekiq::RedisConnection::RedisAdapter options {}
2022-10-11T15:18:55.425Z pid=2676 tid=34k INFO: Starting processing, hit Ctrl-C to stop
[2022-10-11 23:18:55] INFO WEBrick 1.6.1
[2022-10-11 23:18:55] INFO ruby 2.7.5 (2021-11-24) [x86_64-linux]
[2022-10-11 23:18:55] INFO WEBrick::HTTPServer#start: pid=2676 port=9394
创建任务
irb -r ./por.rb
irb(main):001:0> PlainOldRuby.perform_async "like a dog", 3
=> "320a8b57ca6c7dfd232f49a8"
irb(main):002:0>
引入yabeda/sidekiq
当
require 'yabeda/sidekiq'
时,我们会利用yabeda的DSL定义指标(注意运行时有没调用Yabeda.configure!
),并给sidekiq插入中间件([[sidekiq中间件]]),以便拦截sidekiq的运作,设置指标值# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq.rb
module Yabeda
module Sidekiq
# 设置指标
Yabeda.configure do
config = ::Yabeda::Sidekiq.config
group :sidekiq
counter :jobs_enqueued_total, tags: %i[queue worker], comment: "A counter of the total number of jobs sidekiq enqueued."
# ...
collect do
# ...
end
end
# 插入中间件
::Sidekiq.configure_server do |config|
# ...
end
::Sidekiq.configure_client do |config|
# ...
end
end
end
中间件:拦截任务并设置指标值
客户端中间件,但客户端一般不暴露采集接口吧,在服务端采集就好
# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq/client_middleware.rb
module Yabeda
module Sidekiq
class ClientMiddleware
def call(worker, job, queue, _redis_pool)
labels = Yabeda::Sidekiq.labelize(worker, job, queue)
# 增加此类job的入队数
Yabeda.sidekiq_jobs_enqueued_total.increment(labels)
yield
end
end
end
end
服务端中间件
# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq/server_middleware.rb
module Yabeda
module Sidekiq
class ServerMiddleware
# See https://github.com/mperham/sidekiq/discussions/4971
JOB_RECORD_CLASS = defined?(::Sidekiq::JobRecord) ? ::Sidekiq::JobRecord : ::Sidekiq::Job
def call(worker, job, queue)
custom_tags = Yabeda::Sidekiq.custom_tags(worker, job).to_h
labels = Yabeda::Sidekiq.labelize(worker, job, queue).merge(custom_tags)
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
begin
job_instance = JOB_RECORD_CLASS.new(job)
# 统计此类job的出队延迟时间分布(histgram,当前时间与入队时间的差值)
Yabeda.sidekiq_job_latency.measure(labels, job_instance.latency)
# 记录此job的开始执行时间,以便统计running_job_runtime(gauge,当前运行中的job的最大持续时间)
Yabeda::Sidekiq.jobs_started_at[labels][job["jid"]] = start
Yabeda.with_tags(**custom_tags) do
yield
end
# 增加此类job的成功计数(counter)
Yabeda.sidekiq_jobs_success_total.increment(labels)
rescue Exception
# 增加此类job的失败计数(counter)
Yabeda.sidekiq_jobs_failed_total.increment(labels)
raise
ensure
# 统计此类job的运行时间分布
Yabeda.sidekiq_job_runtime.measure(labels, elapsed(start))
# 增加此类job的执行计数(counter)
Yabeda.sidekiq_jobs_executed_total.increment(labels)
# 删除此job的开始执行时间记录
Yabeda::Sidekiq.jobs_started_at[labels].delete(job["jid"])
end
end
private
def elapsed(start)
(Process.clock_gettime(Process::CLOCK_MONOTONIC) - start).round(3)
end
end
end
end
非事件产生的指标
collect do
# 每类job当前运行中的
Yabeda::Sidekiq.track_max_job_runtime if ::Sidekiq.server?
next unless config.collect_cluster_metrics
stats = ::Sidekiq::Stats.new
# 每个队列的长度(gauge)
stats.queues.each do |k, v|
sidekiq_jobs_waiting_count.set({ queue: k }, v)
end
# 执行中的任务数(gauge)
sidekiq_active_workers_count.set({}, stats.workers_size)
# 定时任务数(gauge)
sidekiq_jobs_scheduled_count.set({}, stats.scheduled_size)
# 死任务数(gauge)
sidekiq_jobs_dead_count.set({}, stats.dead_size)
# 进程数(gauge)
sidekiq_active_processes.set({}, stats.processes_size)
# 等待重试的任务数(gauge)
sidekiq_jobs_retry_count.set({}, stats.retry_size)
# 每个队列的延迟(gauge)
::Sidekiq::Queue.all.each do |queue|
sidekiq_queue_latency.set({ queue: queue.name }, queue.latency)
end
# 按队列分组统计待重试的任务数(需要读取重试队列里的每个任务)
#
# 作者注释了以下部分,说怕太慢
#
# retries_by_queues =
# ::Sidekiq::RetrySet.new.each_with_object(Hash.new(0)) do |job, cntr|
# cntr[job["queue"]] += 1
# end
# retries_by_queues.each do |queue, count|
# sidekiq_jobs_retry_count.set({ queue: queue }, count)
# end
end
提供prometheus
见 [[yabeda-prometheus浅析]]