Puma::Laucher#stats

Puma::Laucher本身有提供stats方法可以获取到服务器线程池的状态

module Puma
  class Launcher
    def initialize(conf, launcher_args={})
      # ...
      if clustered?
        @options[:logger] = @events

        @runner = Cluster.new(self, @events)
      else
        @runner = Single.new(self, @events)
      end
      Puma.stats_object = @runner
      # ...
    end

    def stats
      @runner.stats
    end
  end
end

如果是单进程

module Puma
  class Single < Runner
    def stats
      {
        started_at: @started_at.utc.iso8601
      }.merge(@server.stats)
    end
  end
end

如果是多进程

module Puma
  class Cluster < Runner
    def stats
      old_worker_count = @workers.count { |w| w.phase != @phase }
      worker_status = @workers.map do |w|
        {
          started_at: w.started_at.utc.iso8601,
          pid: w.pid,
          index: w.index,
          phase: w.phase,
          booted: w.booted?,
          last_checkin: w.last_checkin.utc.iso8601,
          last_status: w.last_status,
        }
      end

      {
        started_at: @started_at.utc.iso8601,
        workers: @workers.size,
        phase: @phase,
        booted_workers: worker_status.count { |w| w[:booted] },
        old_workers: old_worker_count,
        worker_status: worker_status,
      }
    end
  end
end

Puma::Server#stats

基本上是检查线程池的状态

这里process_client会作为线程池的调用函数,它对@requests_count的增加似乎不是原子性的

module Puma
  class Server
    attr_reader :max_threads

    def initialize(app, events=Events.stdio, options={})
      @max_threads         = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
      @requests_count = 0
      # ...
    end

    def process_client(client, buffer)
      # ...
      begin
        while true
          @requests_count += 1
          case handle_request(client, buffer, requests + 1)
          # ...
        end
        true
      end
    end

    def backlog
      @thread_pool and @thread_pool.backlog
    end

    def running
      @thread_pool and @thread_pool.spawned
    end

    def pool_capacity
      @thread_pool and @thread_pool.pool_capacity
    end

    STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze

    def stats
      STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
    end
  end
end

其中backlogspawnedpool_capacity定义如下

module Puma
  class ThreadPool
    # 已接收到,但未处理的连接
    def backlog
      with_mutex { @todo.size }
    end

    # 等待任务的线程数 + (最大线程数 - 已创建的线程数) 
    # = 可用的线程数,(包括空闲的线程和可创建的线程)
    def pool_capacity
      waiting + (@max - spawned)
    end

    # 当前线程数(太多的话还会自动减少)
    attr_reader :spawned

    def spawn_thread
      @spawned += 1

      # 创建一个不断从todo捞任务的线程
      th = Thread.new(@spawned) do |spawned|
        # ...
        while true
          work = nil
          # 加锁,从todo取work,并更新线程池状态
          mutex.synchronize do
            while todo.empty?
              if @trim_requested > 0
                @spawned -= 1
                # ...
                Thread.exit
              end

              # 未接到任务,空闲线程数+1
              @waiting += 1
              # ...
              not_full.signal
              begin
                not_empty.wait mutex
              ensure
                # 接到任务,空闲线程数-1
                @waiting -= 1
              end
            end

            work = todo.shift
          end
          # ...
        end
      end

      @workers << th
      th
    end
  end
end