用puma的stats方法检查进程与线程状态
Puma::Laucher#stats
Puma::Laucher
本身有提供stats
方法可以获取到服务器线程池的状态module Puma
class Launcher
def initialize(conf, launcher_args={})
# ...
if clustered?
@options[:logger] = @events
@runner = Cluster.new(self, @events)
else
@runner = Single.new(self, @events)
end
Puma.stats_object = @runner
# ...
end
def stats
@runner.stats
end
end
end
如果是单进程
module Puma
class Single < Runner
def stats
{
started_at: @started_at.utc.iso8601
}.merge(@server.stats)
end
end
end
如果是多进程
module Puma
class Cluster < Runner
def stats
old_worker_count = @workers.count { |w| w.phase != @phase }
worker_status = @workers.map do |w|
{
started_at: w.started_at.utc.iso8601,
pid: w.pid,
index: w.index,
phase: w.phase,
booted: w.booted?,
last_checkin: w.last_checkin.utc.iso8601,
last_status: w.last_status,
}
end
{
started_at: @started_at.utc.iso8601,
workers: @workers.size,
phase: @phase,
booted_workers: worker_status.count { |w| w[:booted] },
old_workers: old_worker_count,
worker_status: worker_status,
}
end
end
end
Puma::Server#stats
基本上是检查线程池的状态
这里
process_client
会作为线程池的调用函数,它对@requests_count
的增加似乎不是原子性的module Puma
class Server
attr_reader :max_threads
def initialize(app, events=Events.stdio, options={})
@max_threads = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
@requests_count = 0
# ...
end
def process_client(client, buffer)
# ...
begin
while true
@requests_count += 1
case handle_request(client, buffer, requests + 1)
# ...
end
true
end
end
def backlog
@thread_pool and @thread_pool.backlog
end
def running
@thread_pool and @thread_pool.spawned
end
def pool_capacity
@thread_pool and @thread_pool.pool_capacity
end
STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze
def stats
STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
end
end
end
其中
backlog
、spawned
、pool_capacity
定义如下module Puma
class ThreadPool
# 已接收到,但未处理的连接
def backlog
with_mutex { @todo.size }
end
# 等待任务的线程数 + (最大线程数 - 已创建的线程数)
# = 可用的线程数,(包括空闲的线程和可创建的线程)
def pool_capacity
waiting + (@max - spawned)
end
# 当前线程数(太多的话还会自动减少)
attr_reader :spawned
def spawn_thread
@spawned += 1
# 创建一个不断从todo捞任务的线程
th = Thread.new(@spawned) do |spawned|
# ...
while true
work = nil
# 加锁,从todo取work,并更新线程池状态
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@spawned -= 1
# ...
Thread.exit
end
# 未接到任务,空闲线程数+1
@waiting += 1
# ...
not_full.signal
begin
not_empty.wait mutex
ensure
# 接到任务,空闲线程数-1
@waiting -= 1
end
end
work = todo.shift
end
# ...
end
end
@workers << th
th
end
end
end