concurrent-ruby ScheduledTask
TL;DR
用于提交延时任务,会用到[[concurrent-ruby TimerSet]]对提交过的任务进行排序执行
一般用法如下
Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute
# 或者
Concurrent::ScheduledTask.execute(2){ 'What does the fox say?' }
分析
跟踪一下
task = binding.trace_tree(htmp: 'schedule-task-execute') do
Concurrent::ScheduledTask.execute(2){ 123 }
end
得调用栈如下
调用
ScheduledTask.execute
会创建new一个ScheduledTask
,并调用其ScheduledTask#execute
- Object#block in __pry__ (pry):3
└─ - Concurrent::ScheduledTask.execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:290
├─ - Concurrent::ScheduledTask.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ ├─ - Class#new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:59
│ │ └─ + Concurrent::ScheduledTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:178
│ └─ Concurrent::ScheduledTask#full_memory_barrier $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mri_object.rb:28
└─ - Concurrent::ScheduledTask#execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:273
├─ + Concurrent::ScheduledTask#compare_and_set_state $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/obligation.rb:174
└─ - Concurrent::ScheduledTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
├─ Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
└─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
└─ - Concurrent::ScheduledTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
└─ - Concurrent::ScheduledTask#block in execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:275
└─ + Concurrent::ScheduledTask#ns_schedule $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:312
源码如下
每个
ScheduledTask
初始化时,都会创建- 一个
TimerSet
,默认引用Concurrent.global_timer_set
,用于按时间排列任务并调取最近的任务来执行 - 一个线程池,默认引用
Concurrent.global_io_executor
,用于执行任务
然后
ScheduledTask#execute
会TimerSet#post_task(self)
把自己丢进TimerSet
等到时间了,
TimerSet
就会执行ScheduledTask#process_task
# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb
module Concurrent
class ScheduledTask < IVar
def initialize(delay, opts = {}, &task)
raise ArgumentError.new('no block given') unless block_given?
raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
super(NULL, opts, &nil)
synchronize do
ns_set_state(:unscheduled)
@parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
@args = get_arguments_from(opts)
@delay = delay.to_f
@task = task
@time = nil
@executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
self.observers = Collection::CopyOnNotifyObserverSet.new
end
end
def process_task
safe_execute(@task, @args)
end
def execute
if compare_and_set_state(:pending, :unscheduled)
synchronize{ ns_schedule(@delay) }
end
self
end
def ns_schedule(delay)
@delay = delay
@time = Concurrent.monotonic_time + @delay
@parent.send(:post_task, self)
end
end
end
如何传递参数据呢?从上面
initialize
可见,参数会用get_arguments_from
抽取def get_arguments_from(opts = {})
[*opts.fetch(:args, [])]
end
所以这样传参就可以了
Concurrent::ScheduledTask.execute(0, {args: [1,2,3]}) do |a, b, c|
pp [a, b]
end
# [1, 2]