TL;DR

  • 文档:Module: Concurrent::Async — Concurrent Ruby
  • 目的:使实例方法调用变成异步非阻塞,且线程安全
  • 在类中include Concurrent::Async,然后调用实例方法前,先调用async(返回pending状态的IVar)或await(返回complete状态的IVar
  • async会将方法调用加入该对象自身的队列,然后由Concurrent.global_io_executor逐个出队来执行
  • 这就使得并发调用一个对象的各种方法,都会变成线性的,线程安全的
  • await也是调用async,但等待Ivar被设值才返回
  • 如果你有某些调用是不加asyncawait的,那么还是需要注意线程安全问题
include Concurrent::Async

在类中include Concurrent::Async,例如

class A
  include Concurrent::Async

  def a
    sleep 1
    Time.now
  end
end

这会导致类的new方法被重新定义,使其创建创建出的对象内含AsyncDelegator.new(self)AwaitDelegator.new(@__async_delegator__)

module Concurrent
  module Async
    def self.included(base)
      base.singleton_class.send(:alias_method, :original_new, :new)
      base.extend(ClassMethods)
      super(base)
    end

    # @!visibility private
    module ClassMethods
      def new(*args, &block)
        obj = original_new(*args, &block)
        obj.send(:init_synchronization)
        obj
      end
      ruby2_keywords :new if respond_to?(:ruby2_keywords, true)
    end

    def init_synchronization
      return self if defined?(@__async_initialized__) && @__async_initialized__
      @__async_initialized__ = true
      @__async_delegator__ = AsyncDelegator.new(self)
      @__await_delegator__ = AwaitDelegator.new(@__async_delegator__)
      self
    end
  end
end

async方法

一般使用就是调用实例方法前加上async,使其返回一个IVar。这里跟踪一下

binding.trace_tree(htmp: 'include_async') do
  A.new.async.a
end


 - Object#block in __pry__ (pry):6
 ├─ - A.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:270
 │  ├─ + ##new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:271
 │  └─ + A#init_synchronization $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:441
 ├─   A#async $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:412
 └─ - Concurrent::Async::AsyncDelegator#method_missing $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:305
    ├─   Kernel#respond_to? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:306
    ├─ + Concurrent::Async.validate_argc $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:250
    ├─ + Concurrent::IVar.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    └─ + Concurrent::Async::AsyncDelegator#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43

如上所述,实例化时会创建AsyncDelegatorAwaitDelegator

然后调用async会返回这个AsyncDelegator。接着调用原对象的任何方法,都会被其method_missing拦截,暂存到@queue中,并返回IVar

暂存的方法调用会由Concurrent.global_io_executor异步“逐个地”、“线性地”执行,并将结果设置到每个IVar

因为同一对象的多个方法调用是逐个执行的,所以可以认为是线程安全的

# concurrent-ruby-1.2.2/lib/concurrent-ruby/concurrent/async.rb
module Concurrent
  module Async
    class AsyncDelegator < Synchronization::LockableObject
      safe_initialization!

      def initialize(delegate)
        super()
        @delegate = delegate
        @queue = []
        @executor = Concurrent.global_io_executor
        @ruby_pid = $$
      end

      def method_missing(method, *args, &block)
        super unless @delegate.respond_to?(method)
        Async::validate_argc(@delegate, method, *args)

        ivar = Concurrent::IVar.new
        synchronize do
          reset_if_forked
          @queue.push [ivar, method, args, block] # 将结果、方法、参数塞进队列
          @executor.post { perform } if @queue.length == 1 # 发起perform以尽量清空队列
        end

        ivar
      end

      def perform
        # 循环以尽量清空队列
        loop do
          ivar, method, args, block = synchronize { @queue.first }
          break unless ivar # queue is empty

          begin
            ivar.set(@delegate.send(method, *args, &block)) # 调用方法并赋值结果
          rescue => error
            ivar.fail(error)
          end

          synchronize do
            @queue.shift
            return if @queue.empty?
          end
        end
      end
    end
  end
end

await方法

await方法会将方法调用委派给AsyncDelegator,所以实际上也是在Concurrent.global_io_executor异步执行,但会对其返回的IVar等待

感觉不如直接调用原实例方法?其实不然,因为这样相当于让一个对象的所有实例方法的调用都是线性的,保证线程安全

注意不要在async方法调用中,又await一个方法调用,因为所有方法调用都是线性执行的,async没返回,则await也不会开始,这就死锁了

module Concurrent
  module Async
    class AwaitDelegator
      def initialize(delegate)
        @delegate = delegate
      end

      def method_missing(method, *args, &block)
        ivar = @delegate.send(method, *args, &block)
        ivar.wait
        ivar
      end
    end
  end
end