concurrent-ruby TimerTask
TL;DR
- 用于每隔一段时间就重复地执行任务
task = Concurrent::TimerTask.new{ ... }
创建任务,这个task
会内含一个Concurrent::SafeTaskExecutor
task.execute
启动任务,会用ScheduledTask.execute
([[concurrent-ruby ScheduledTask]]),异步地执行Concurrent::SafeTaskExecutor
- 执行完后又再
ScheduledTask.execute
……不会在时间上重叠
创建
跟踪一下
task = binding.trace_tree(htmp: 'timer_task_trace_new'){ Concurrent::TimerTask.new{ puts 'Boom!' } }
得
- Object#block in __pry__ (pry):3
└─ - Concurrent::TimerTask.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
├─ - Class#new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:59
│ └─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:180
│ ├─ Kernel#block_given? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:181
│ ├─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb:11
│ │ ├─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:23
│ │ │ ├─ + Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:29
│ │ │ └─ - Concurrent::TimerTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
│ │ │ ├─ Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
│ │ │ └─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
│ │ │ └─ - Concurrent::TimerTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
│ │ │ └─ - Concurrent::TimerTask#block in initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:25
│ │ │ ├─ Hash#fetch $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:26
│ │ │ ├─ Hash#key? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:27
│ │ │ └─ - Concurrent::TimerTask#ns_initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:263
│ │ │ ├─ + Concurrent::TimerTask#set_deref_options $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/dereferenceable.rb:48
│ │ │ ├─ + Concurrent::TimerTask#execution_interval= $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:237
│ │ │ ├─ + Concurrent::SafeTaskExecutor.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ │ │ ├─ + Concurrent::AtomicBoolean.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ │ │ └─ + Concurrent::Collection::CopyOnNotifyObserverSet.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ │ ├─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ │ └─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
│ └─ + Concurrent::TimerTask#set_deref_options $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/dereferenceable.rb:48
└─ Concurrent::TimerTask#full_memory_barrier $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mri_object.rb:28
其中
Concurrent::TimerTask#ns_initialize
代码如下# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb
module Concurrent
class TimerTask < RubyExecutorService
def ns_initialize(opts, &task)
set_deref_options(opts)
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
if opts[:timeout] || opts[:timeout_interval]
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
end
@run_now = opts[:now] || opts[:run_now]
@executor = Concurrent::SafeTaskExecutor.new(task)
@running = Concurrent::AtomicBoolean.new(false)
@value = nil
self.observers = Collection::CopyOnNotifyObserverSet.new
end
end
end
执行
跟踪一下
binding.trace_tree(htmp: 'timer_task_trace_execute'){ task.execute }
得
- Object#block in __pry__ (pry):4
└─ - Concurrent::TimerTask#execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:206
└─ - Concurrent::TimerTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
├─ Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
└─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
└─ - Concurrent::TimerTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
└─ - Concurrent::TimerTask#block in execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:207
├─ + Concurrent::AtomicBoolean#false? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb:32
├─ + Concurrent::AtomicBoolean#make_true $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb:37
└─ - Concurrent::TimerTask#schedule_next_task $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:291
├─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
├─ Kernel#method $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:292
├─ Method#to_proc $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:292
└─ + Concurrent::ScheduledTask.execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:290
其中
Concurrent::TimerTask#schedule_next_task
如下# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb
module Concurrent
class TimerTask < RubyExecutorService
def schedule_next_task(interval = execution_interval)
ScheduledTask.execute(interval, args: [Concurrent::Event.new], &method(:execute_task))
nil
end
# @!visibility private
def execute_task(completion)
return nil unless @running.true?
_success, value, reason = @executor.execute(self)
if completion.try?
self.value = value
schedule_next_task
time = Time.now
observers.notify_observers do
[time, self.value, reason]
end
end
nil
end
end
end