TL;DR

  • 指标有采自sidekiq存于redis中的状态快照,也有通过middleware拦截记录
官网

yabeda-rb/yabeda-sidekiq: Yabeda plugin for complete monitoring of Sidekiq

环境搭建

在 [[sidekiq环境]] 的环境基础上,加上yabeda-sidekiq和yabeda-prometheus

require "sidekiq"
require "yabeda/sidekiq"
require "yabeda/prometheus"

Yabeda.configure!

if ENV['METRICS_SERVER']
  Sidekiq.configure_server do |_config|
    Yabeda::Prometheus::Exporter.start_metrics_server!
  end
end

class PlainOldRuby
  include Sidekiq::Job

  def perform(how_hard = "super hard", how_long = 1)
    sleep how_long
    puts "Workin' #{how_hard}"
  end
end

启动服务

METRICS_SERVER=1 sidekiq -r ./por.rb

可见sidekiq以及metrics-server都有启动

2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Running in ruby 2.7.5p203 (2021-11-24 revision f69aeb8314) [x86_64-linux]
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: See LICENSE and the LGPL-3.0 for licensing details.
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Upgrade to Sidekiq Pro for more features and support: https://sidekiq.org
2022-10-11T15:18:55.421Z pid=2676 tid=34k INFO: Booting Sidekiq 6.5.7 with Sidekiq::RedisConnection::RedisAdapter options {}
2022-10-11T15:18:55.425Z pid=2676 tid=34k INFO: Starting processing, hit Ctrl-C to stop
[2022-10-11 23:18:55] INFO  WEBrick 1.6.1
[2022-10-11 23:18:55] INFO  ruby 2.7.5 (2021-11-24) [x86_64-linux]
[2022-10-11 23:18:55] INFO  WEBrick::HTTPServer#start: pid=2676 port=9394

创建任务

irb -r ./por.rb
irb(main):001:0> PlainOldRuby.perform_async "like a dog", 3
=> "320a8b57ca6c7dfd232f49a8"
irb(main):002:0>

http://your-sidekiq-host:9394/metrics检查统计数据

引入yabeda/sidekiq

require 'yabeda/sidekiq'时,我们会利用yabeda的DSL定义指标(注意运行时有没调用Yabeda.configure!),并给sidekiq插入中间件([[sidekiq中间件]]),以便拦截sidekiq的运作,设置指标值

# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq.rb
module Yabeda
  module Sidekiq
    # 设置指标
    Yabeda.configure do
      config = ::Yabeda::Sidekiq.config

      group :sidekiq
      counter :jobs_enqueued_total, tags: %i[queue worker], comment: "A counter of the total number of jobs sidekiq enqueued."
      # ...

      collect do
        # ...
      end
    end

    # 插入中间件
    ::Sidekiq.configure_server do |config|
      # ...
    end
    ::Sidekiq.configure_client do |config|
      # ...
    end
  end
end

中间件:拦截任务并设置指标值

客户端中间件,但客户端一般不暴露采集接口吧,在服务端采集就好

# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq/client_middleware.rb
module Yabeda
  module Sidekiq
    class ClientMiddleware
      def call(worker, job, queue, _redis_pool)
        labels = Yabeda::Sidekiq.labelize(worker, job, queue)
        # 增加此类job的入队数
        Yabeda.sidekiq_jobs_enqueued_total.increment(labels)
        yield
      end
    end
  end
end

服务端中间件

# yabeda-sidekiq-0.9.0/lib/yabeda/sidekiq/server_middleware.rb
module Yabeda
  module Sidekiq
    class ServerMiddleware
      # See https://github.com/mperham/sidekiq/discussions/4971
      JOB_RECORD_CLASS = defined?(::Sidekiq::JobRecord) ? ::Sidekiq::JobRecord : ::Sidekiq::Job

      def call(worker, job, queue)
        custom_tags = Yabeda::Sidekiq.custom_tags(worker, job).to_h
        labels = Yabeda::Sidekiq.labelize(worker, job, queue).merge(custom_tags)
        start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        begin
          job_instance = JOB_RECORD_CLASS.new(job)
          # 统计此类job的出队延迟时间分布(histgram,当前时间与入队时间的差值)
          Yabeda.sidekiq_job_latency.measure(labels, job_instance.latency)
          # 记录此job的开始执行时间,以便统计running_job_runtime(gauge,当前运行中的job的最大持续时间)
          Yabeda::Sidekiq.jobs_started_at[labels][job["jid"]] = start
          Yabeda.with_tags(**custom_tags) do
            yield
          end
          # 增加此类job的成功计数(counter)
          Yabeda.sidekiq_jobs_success_total.increment(labels)
        rescue Exception
          # 增加此类job的失败计数(counter)
          Yabeda.sidekiq_jobs_failed_total.increment(labels)
          raise
        ensure
          # 统计此类job的运行时间分布
          Yabeda.sidekiq_job_runtime.measure(labels, elapsed(start))
          # 增加此类job的执行计数(counter)
          Yabeda.sidekiq_jobs_executed_total.increment(labels)
          # 删除此job的开始执行时间记录
          Yabeda::Sidekiq.jobs_started_at[labels].delete(job["jid"])
        end
      end

      private

      def elapsed(start)
        (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start).round(3)
      end
    end
  end
end

非事件产生的指标

collect do
  # 每类job当前运行中的
  Yabeda::Sidekiq.track_max_job_runtime if ::Sidekiq.server?

  next unless config.collect_cluster_metrics

  stats = ::Sidekiq::Stats.new

  # 每个队列的长度(gauge)
  stats.queues.each do |k, v|
    sidekiq_jobs_waiting_count.set({ queue: k }, v)
  end
  # 执行中的任务数(gauge)
  sidekiq_active_workers_count.set({}, stats.workers_size)
  # 定时任务数(gauge)
  sidekiq_jobs_scheduled_count.set({}, stats.scheduled_size)
  # 死任务数(gauge)
  sidekiq_jobs_dead_count.set({}, stats.dead_size)
  # 进程数(gauge)
  sidekiq_active_processes.set({}, stats.processes_size)
  # 等待重试的任务数(gauge)
  sidekiq_jobs_retry_count.set({}, stats.retry_size)

  # 每个队列的延迟(gauge)
  ::Sidekiq::Queue.all.each do |queue|
    sidekiq_queue_latency.set({ queue: queue.name }, queue.latency)
  end

  # 按队列分组统计待重试的任务数(需要读取重试队列里的每个任务)
  #
  # 作者注释了以下部分,说怕太慢
  #
  # retries_by_queues =
  #     ::Sidekiq::RetrySet.new.each_with_object(Hash.new(0)) do |job, cntr|
  #       cntr[job["queue"]] += 1
  #     end
  # retries_by_queues.each do |queue, count|
  #   sidekiq_jobs_retry_count.set({ queue: queue }, count)
  # end
end

提供prometheus

见 [[yabeda-prometheus浅析]]