TL;DR

  • 用于每隔一段时间就重复地执行任务
  • task = Concurrent::TimerTask.new{ ... }创建任务,这个task会内含一个Concurrent::SafeTaskExecutor
  • task.execute启动任务,会用ScheduledTask.execute([[concurrent-ruby ScheduledTask]]),异步地执行Concurrent::SafeTaskExecutor
  • 执行完后又再ScheduledTask.execute……不会在时间上重叠
创建

跟踪一下

task = binding.trace_tree(htmp: 'timer_task_trace_new'){ Concurrent::TimerTask.new{ puts 'Boom!' } }


 - Object#block in __pry__ (pry):3
 └─ - Concurrent::TimerTask.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    ├─ - Class#new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:59
    │  └─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:180
    │     ├─   Kernel#block_given? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:181
    │     ├─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb:11
    │     │  ├─ - Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:23
    │     │  │  ├─ + Concurrent::TimerTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:29
    │     │  │  └─ - Concurrent::TimerTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
    │     │  │     ├─   Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
    │     │  │     └─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
    │     │  │        └─ - Concurrent::TimerTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
    │     │  │           └─ - Concurrent::TimerTask#block in initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:25
    │     │  │              ├─   Hash#fetch $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:26
    │     │  │              ├─   Hash#key? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb:27
    │     │  │              └─ - Concurrent::TimerTask#ns_initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:263
    │     │  │                 ├─ + Concurrent::TimerTask#set_deref_options $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/dereferenceable.rb:48
    │     │  │                 ├─ + Concurrent::TimerTask#execution_interval= $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:237
    │     │  │                 ├─ + Concurrent::SafeTaskExecutor.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │     │  │                 ├─ + Concurrent::AtomicBoolean.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │     │  │                 └─ + Concurrent::Collection::CopyOnNotifyObserverSet.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │     │  ├─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │     │  └─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │     └─ + Concurrent::TimerTask#set_deref_options $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/dereferenceable.rb:48
    └─   Concurrent::TimerTask#full_memory_barrier $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mri_object.rb:28

其中Concurrent::TimerTask#ns_initialize代码如下

# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb
module Concurrent
  class TimerTask < RubyExecutorService
    def ns_initialize(opts, &task)
      set_deref_options(opts)

      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
      if opts[:timeout] || opts[:timeout_interval]
        warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
      end
      @run_now = opts[:now] || opts[:run_now]
      @executor = Concurrent::SafeTaskExecutor.new(task)
      @running = Concurrent::AtomicBoolean.new(false)
      @value = nil

      self.observers = Collection::CopyOnNotifyObserverSet.new
    end
  end
end

执行

跟踪一下

binding.trace_tree(htmp: 'timer_task_trace_execute'){ task.execute }


 - Object#block in __pry__ (pry):4
 └─ - Concurrent::TimerTask#execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:206
    └─ - Concurrent::TimerTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
       ├─   Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
       └─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
          └─ - Concurrent::TimerTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
             └─ - Concurrent::TimerTask#block in execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:207
                ├─ + Concurrent::AtomicBoolean#false? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb:32
                ├─ + Concurrent::AtomicBoolean#make_true $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb:37
                └─ - Concurrent::TimerTask#schedule_next_task $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:291
                   ├─ + Concurrent::Event.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
                   ├─   Kernel#method $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:292
                   ├─   Method#to_proc $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb:292
                   └─ + Concurrent::ScheduledTask.execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:290

其中Concurrent::TimerTask#schedule_next_task如下

# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/timer_task.rb
module Concurrent
  class TimerTask < RubyExecutorService
    def schedule_next_task(interval = execution_interval)
      ScheduledTask.execute(interval, args: [Concurrent::Event.new], &method(:execute_task))
      nil
    end

    # @!visibility private
    def execute_task(completion)
      return nil unless @running.true?
      _success, value, reason = @executor.execute(self)
      if completion.try?
        self.value = value
        schedule_next_task
        time = Time.now
        observers.notify_observers do
          [time, self.value, reason]
        end
      end
      nil
    end
  end
end