nio4r的常用套路
在actioncable的lib/action_cable/connection/stream_event_loop.rb中,蕴含着对于nio4r的select、register、deregister、wakeup的使用套路,该套路简化如下:
class NioWrapper
# 初始化一个NIO::Selector实例、一个队列、一个线程
# 线程会做两件事:
# 1、处理队列里的事件(通常是注册io或解除注册)
# 2、等待io事件发生并处理之
def initialize
@nio = NIO::Selector.new
@todo = Queue.new
Thread.new do
loop do
@todo.pop.call until @todo.empty?
@nio.select do |monitor|
monitor.value.call
# detach(monitor.io) unless monitor.value.call
end
end
end
end
# 此方法用于注册io事件
# 将注册任务推入队列后,记得wakeup
def attach(io, interest, &block)
@todo << lambda do
monitor = @nio.register(io, interest)
monitor.value = block
end
@nio.wakeup
end
# 此方法用于注册io事件
# 将注册任务推入队列后,记得wakeup
# def detach(io)
# @todo << lambda do
# @nio.deregister(io)
# io.close
# end
# @nio.wakeup
# end
end
使用方法
nio = Wrapper.new
loop
nio.attach(server.accept, :r) do |socket|
socket.read_nonblock #...
end
end
为什么要wakeup?
因为系统调用select会将当前线程挂起,直至其监听的任一文件可读或可写。此过程中无法注册其它文件或解除注册正在监听的文件。
所以当你有心文件需要监听时,你需要先让该线程重新运行,然后让它再次发起系统调用监听拥有那个新文件的文件集合
而为了让线程重新运行,我们可以使用一个管道来作通讯:在nio对象新建时,顺便新建一个管道,并且让nio每次select时都监听该管道,这样一来,只要往管道发送数据,便可让系统唤醒暂停于select的线程
(除了wakeup还有什么方法可让线程提前返回?有,设置timeout)




小结:
1、Kernel.select是对于系统调用select、poll、epoll的封装
2、nio4r是对于Kernel.select的封装,并提供了唤醒的接口
3、进一步对nio4r进行封装,可使调用者无需总是记着注册后唤醒selector,也可对唤醒请求进行节流
延伸,puma的多路复用:
puma没有使用nio4r(说的是puma 3。在puma 5是用nio4r的),而是自己封装了Kernel.select,并同样使用pipe来作唤醒。此外,puma是线程池内每个线程都有一个reactor,来达到少量线程监听大量链接的效果。