TL;DR

  • 通过实现puma插件的方式,被puma初始化
  • 插件中通过puma controller server获取puma统计信息。(需要在puma配置文件中activatecontrolapp)
  • 如果需要向prometheus提供数据,则还需加上yabeda-prometheus这个gem。(因为可以对接prometheus以外的监控系统,所以没有在一开始就启动metric服务器)
搭建环境

参考 [[puma环境搭建]]

安装gem

source 'https://rubygems.org/'

gem 'puma', '~> 5.6.5'
gem 'yabeda-puma-plugin', '~> 0.7.0'
gem 'yabeda-prometheus', '~> 0.8.0'

config.ru

require 'rack'
require 'yabeda'
require 'puma/plugin/yabeda'
require 'yabeda/prometheus'

app = lambda do |_|
  sleep rand(5)
  [200, { 'Content-Type' => 'text/html' }, ['OK']]
end

use Yabeda::Prometheus::Exporter, path: "/metrics"
run app

# 记得加这行
Yabeda.configure!

puma.rb

threads 5, 10

activate_control_app
plugin :yabeda

concurrent_get.rb

require 'uri'
require 'net/http'

threads = 6.times.map do
  Thread.new do
    loop do
      url = URI("http://192.168.31.200:9292/")
      http = Net::HTTP.new(url.host, url.port)
      request = Net::HTTP::Get.new(url)
      response = http.request(request)
      puts response.read_body
      sleep rand(3)
    end
  end
end

threads.each(&:join)

分别在两个会话中执行

# 启动服务
bundle exec puma -C puma.rb

# 并发访问
ruby concurrent_get.rb

访问/metrics,可得类似以下统计信息

# TYPE puma_backlog gauge
# HELP puma_backlog Number of established but unaccepted connections in the backlog
puma_backlog{index="0"} 0.0
# TYPE puma_running gauge
# HELP puma_running Number of running worker threads
puma_running{index="0"} 7.0
# TYPE puma_pool_capacity gauge
# HELP puma_pool_capacity Number of allocatable worker threads
puma_pool_capacity{index="0"} 6.0
# TYPE puma_max_threads gauge
# HELP puma_max_threads Maximum number of worker threads
puma_max_threads{index="0"} 10.0

源码

它是以puma插件的形式运作的([[puma的plugin机制]])。流程可以这么简单概括:

  • 在puma启动时,获取control server的地址([[puma的activatecontrolapp]])
  • 并且定义指标
  • 当监控系统例如yabeda-prometheus拉取数据时,从control server读取统计信息,返回出去
# yabeda-puma-plugin-0.7.0/lib/puma/plugin/yabeda.rb
Puma::Plugin.create do
  def start(launcher)
    clustered = (launcher.options[:workers] || 0) > 0

    # 获取puma的control app的socket地址和token
    # 并记录下来
    control_url = launcher.options[:control_url]
    raise StandardError, "Puma control app is not activated" if control_url == nil

    Yabeda::Puma::Plugin.tap do |puma|
      puma.control_url = control_url
      puma.control_auth_token = launcher.options[:control_auth_token]
    end

    # 定义指标
    Yabeda.configure do
      # 使以下指标带前缀,例如puma_backlog、puma_running
      group :puma

      gauge :backlog, tags: %i[index], comment: 'Number of established but unaccepted connections in the backlog', aggregation: :most_recent
      gauge :running, tags: %i[index], comment: 'Number of running worker threads', aggregation: :most_recent
      gauge :pool_capacity, tags: %i[index], comment: 'Number of allocatable worker threads', aggregation: :most_recent
      gauge :max_threads, tags: %i[index], comment: 'Maximum number of worker threads', aggregation: :most_recent

      # 如果是多进程模式,则还会读取到这些统计信息
      if clustered
        gauge :workers, comment: 'Number of configured workers', aggregation: :most_recent
        gauge :booted_workers, comment: 'Number of booted workers', aggregation: :most_recent
        gauge :old_workers, comment: 'Number of old workers', aggregation: :most_recent
      end

      # 当监控系统例如yabeda-prometheus拉取数据时,会执行此block
      collect do
        require 'yabeda/puma/plugin/statistics/fetcher'
        stats = Yabeda::Puma::Plugin::Statistics::Fetcher.call
        require 'yabeda/puma/plugin/statistics/parser'
        Yabeda::Puma::Plugin::Statistics::Parser.new(clustered: clustered, data: stats).call.each do |item|
          send("puma_#{item[:name]}").set(item[:labels], item[:value])
        end
      end
    end
  end
end

其中Fetcher就是用来读取control server的

# yabeda-puma-plugin-0.7.0/lib/yabeda/puma/plugin/statistics/fetcher.rb
module Yabeda
  module Puma
    module Plugin
      module Statistics
        class Fetcher
          def self.call
            control_url = Yabeda::Puma::Plugin.control_url

            body = if control_url.start_with? "unix://"
              path = control_url.gsub("unix://", '')
              Socket.unix(path, &socket_block)
            elsif control_url.start_with? "tcp://"
              host, port = control_url.match(/^tcp:\/\/([a-z0-9\-._~%]+):([0-9]+)/).captures
              Socket.tcp(host, port, &socket_block)
            else
              raise ArgumentError.new("Unknown puma control url type #{control_url}")
            end

            JSON.parse(body.split("\n").last)
          end

          private

          def self.socket_block
            Proc.new do |s|
              s << "GET /stats?token=#{Yabeda::Puma::Plugin.control_auth_token} HTTP/1.0\r\n\r\n"
              s.read
            end
          end
        end
      end
    end
  end
end

上述body.split("\n").last会取得形如下文的统计数据

单进程

{
    "started_at": "2022-10-15T06:10:45Z",
    "backlog": 0,
    "running": 6,
    "pool_capacity": 5,
    "max_threads": 10,
    "requests_count": 16
}

多进程

{
    "started_at": "2022-10-15T06:01:16Z",
    "workers": 2,
    "phase": 0,
    "booted_workers": 2,
    "old_workers": 0,
    "worker_status": [
        {
            "started_at": "2022-10-15T06:01:16Z",
            "pid": 6185,
            "index": 0,
            "phase": 0,
            "booted": true,
            "last_checkin": "2022-10-15T06:01:21Z",
            "last_status": {
                "backlog": 0,
                "running": 5,
                "pool_capacity": 6,
                "max_threads": 10,
                "requests_count": 10
            }
        },
        {
            "started_at": "2022-10-15T06:01:16Z",
            "pid": 6187,
            "index": 1,
            "phase": 0,
            "booted": true,
            "last_checkin": "2022-10-15T06:01:21Z",
            "last_status": {
                "backlog": 0,
                "running": 5,
                "pool_capacity": 10,
                "max_threads": 10,
                "requests_count": 3
            }
        }
    ]
}

最终转换成prometheus的格式

单进程

# TYPE puma_backlog gauge
# HELP puma_backlog Number of established but unaccepted connections in the backlog
puma_backlog{index="0"} 0.0
# TYPE puma_running gauge
# HELP puma_running Number of running worker threads
puma_running{index="0"} 6.0
# TYPE puma_pool_capacity gauge
# HELP puma_pool_capacity Number of allocatable worker threads
puma_pool_capacity{index="0"} 5.0
# TYPE puma_max_threads gauge
# HELP puma_max_threads Maximum number of worker threads
puma_max_threads{index="0"} 10.0

多进程

# TYPE puma_backlog gauge
# HELP puma_backlog Number of established but unaccepted connections in the backlog
puma_backlog{index="0"} 0.0
puma_backlog{index="1"} 0.0
# TYPE puma_running gauge
# HELP puma_running Number of running worker threads
puma_running{index="0"} 5.0
puma_running{index="1"} 5.0
# TYPE puma_pool_capacity gauge
# HELP puma_pool_capacity Number of allocatable worker threads
puma_pool_capacity{index="0"} 5.0
puma_pool_capacity{index="1"} 9.0
# TYPE puma_max_threads gauge
# HELP puma_max_threads Maximum number of worker threads
puma_max_threads{index="0"} 10.0
puma_max_threads{index="1"} 10.0
# TYPE puma_workers gauge
# HELP puma_workers Number of configured workers
puma_workers 2.0
# TYPE puma_booted_workers gauge
# HELP puma_booted_workers Number of booted workers
puma_booted_workers 2.0
# TYPE puma_old_workers gauge
# HELP puma_old_workers Number of old workers
puma_old_workers 0.0