在actioncable的lib/action_cable/connection/stream_event_loop.rb中,蕴含着对于nio4r的select、register、deregister、wakeup的使用套路,该套路简化如下:

class NioWrapper

  # 初始化一个NIO::Selector实例、一个队列、一个线程
  # 线程会做两件事:
  # 1、处理队列里的事件(通常是注册io或解除注册)
  # 2、等待io事件发生并处理之
  def initialize
    @nio = NIO::Selector.new
    @todo = Queue.new

    Thread.new do
      loop do
        @todo.pop.call until @todo.empty?

        @nio.select do |monitor|
          monitor.value.call
          # detach(monitor.io) unless monitor.value.call
        end
      end
    end
  end

  # 此方法用于注册io事件
  # 将注册任务推入队列后,记得wakeup
  def attach(io, interest, &block)
    @todo << lambda do
      monitor = @nio.register(io, interest)
      monitor.value = block
    end
    @nio.wakeup
  end

  # 此方法用于注册io事件
  # 将注册任务推入队列后,记得wakeup
  # def detach(io)
  #   @todo << lambda do
  #     @nio.deregister(io)
  #     io.close
  #   end
  #   @nio.wakeup
  # end
end


使用方法

nio = Wrapper.new

loop
  nio.attach(server.accept, :r) do |socket|
    socket.read_nonblock #...
  end
end


为什么要wakeup?

因为系统调用select会将当前线程挂起,直至其监听的任一文件可读或可写。此过程中无法注册其它文件或解除注册正在监听的文件。

所以当你有心文件需要监听时,你需要先让该线程重新运行,然后让它再次发起系统调用监听拥有那个新文件的文件集合

而为了让线程重新运行,我们可以使用一个管道来作通讯:在nio对象新建时,顺便新建一个管道,并且让nio每次select时都监听该管道,这样一来,只要往管道发送数据,便可让系统唤醒暂停于select的线程

(除了wakeup还有什么方法可让线程提前返回?有,设置timeout)





小结:

1、Kernel.select是对于系统调用select、poll、epoll的封装
2、nio4r是对于Kernel.select的封装,并提供了唤醒的接口
3、进一步对nio4r进行封装,可使调用者无需总是记着注册后唤醒selector,也可对唤醒请求进行节流

延伸,puma的多路复用:

puma没有使用nio4r(说的是puma 3。在puma 5是用nio4r的),而是自己封装了Kernel.select,并同样使用pipe来作唤醒。此外,puma是线程池内每个线程都有一个reactor,来达到少量线程监听大量链接的效果。