concurrent-ruby Async
TL;DR
- 文档:Module: Concurrent::Async — Concurrent Ruby
- 目的:使实例方法调用变成异步非阻塞,且线程安全
- 在类中
include Concurrent::Async
,然后调用实例方法前,先调用async
(返回pending状态的IVar
)或await
(返回complete状态的IVar
) async
会将方法调用加入该对象自身的队列,然后由Concurrent.global_io_executor
逐个出队来执行- 这就使得并发调用一个对象的各种方法,都会变成线性的,线程安全的
await
也是调用async
,但等待Ivar
被设值才返回- 如果你有某些调用是不加
async
或await
的,那么还是需要注意线程安全问题
include Concurrent::Async
在类中
include Concurrent::Async
,例如class A
include Concurrent::Async
def a
sleep 1
Time.now
end
end
这会导致类的
new
方法被重新定义,使其创建创建出的对象内含AsyncDelegator.new(self)
和AwaitDelegator.new(@__async_delegator__)
module Concurrent
module Async
def self.included(base)
base.singleton_class.send(:alias_method, :original_new, :new)
base.extend(ClassMethods)
super(base)
end
# @!visibility private
module ClassMethods
def new(*args, &block)
obj = original_new(*args, &block)
obj.send(:init_synchronization)
obj
end
ruby2_keywords :new if respond_to?(:ruby2_keywords, true)
end
def init_synchronization
return self if defined?(@__async_initialized__) && @__async_initialized__
@__async_initialized__ = true
@__async_delegator__ = AsyncDelegator.new(self)
@__await_delegator__ = AwaitDelegator.new(@__async_delegator__)
self
end
end
end
async方法
一般使用就是调用实例方法前加上
async
,使其返回一个IVar
。这里跟踪一下binding.trace_tree(htmp: 'include_async') do
A.new.async.a
end
得
- Object#block in __pry__ (pry):6
├─ - A.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:270
│ ├─ + ##new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:271
│ └─ + A#init_synchronization $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:441
├─ A#async $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:412
└─ - Concurrent::Async::AsyncDelegator#method_missing $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:305
├─ Kernel#respond_to? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:306
├─ + Concurrent::Async.validate_argc $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/async.rb:250
├─ + Concurrent::IVar.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
└─ + Concurrent::Async::AsyncDelegator#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
如上所述,实例化时会创建
AsyncDelegator
和AwaitDelegator
然后调用
async
会返回这个AsyncDelegator
。接着调用原对象的任何方法,都会被其method_missing
拦截,暂存到@queue
中,并返回IVar
。暂存的方法调用会由
Concurrent.global_io_executor
异步“逐个地”、“线性地”执行,并将结果设置到每个IVar
中因为同一对象的多个方法调用是逐个执行的,所以可以认为是线程安全的
# concurrent-ruby-1.2.2/lib/concurrent-ruby/concurrent/async.rb
module Concurrent
module Async
class AsyncDelegator < Synchronization::LockableObject
safe_initialization!
def initialize(delegate)
super()
@delegate = delegate
@queue = []
@executor = Concurrent.global_io_executor
@ruby_pid = $$
end
def method_missing(method, *args, &block)
super unless @delegate.respond_to?(method)
Async::validate_argc(@delegate, method, *args)
ivar = Concurrent::IVar.new
synchronize do
reset_if_forked
@queue.push [ivar, method, args, block] # 将结果、方法、参数塞进队列
@executor.post { perform } if @queue.length == 1 # 发起perform以尽量清空队列
end
ivar
end
def perform
# 循环以尽量清空队列
loop do
ivar, method, args, block = synchronize { @queue.first }
break unless ivar # queue is empty
begin
ivar.set(@delegate.send(method, *args, &block)) # 调用方法并赋值结果
rescue => error
ivar.fail(error)
end
synchronize do
@queue.shift
return if @queue.empty?
end
end
end
end
end
end
await方法
await
方法会将方法调用委派给AsyncDelegator
,所以实际上也是在Concurrent.global_io_executor
异步执行,但会对其返回的IVar
等待感觉不如直接调用原实例方法?其实不然,因为这样相当于让一个对象的所有实例方法的调用都是线性的,保证线程安全
注意不要在
async
方法调用中,又await
一个方法调用,因为所有方法调用都是线性执行的,async
没返回,则await
也不会开始,这就死锁了module Concurrent
module Async
class AwaitDelegator
def initialize(delegate)
@delegate = delegate
end
def method_missing(method, *args, &block)
ivar = @delegate.send(method, *args, &block)
ivar.wait
ivar
end
end
end
end