TL;DR

用于提交延时任务,会用到[[concurrent-ruby TimerSet]]对提交过的任务进行排序执行

一般用法如下

Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute

# 或者
Concurrent::ScheduledTask.execute(2){ 'What does the fox say?' }

分析

跟踪一下

task = binding.trace_tree(htmp: 'schedule-task-execute') do
  Concurrent::ScheduledTask.execute(2){ 123 }
end

得调用栈如下

调用ScheduledTask.execute会创建new一个ScheduledTask,并调用其ScheduledTask#execute

 - Object#block in __pry__ (pry):3
 └─ - Concurrent::ScheduledTask.execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:290
    ├─ - Concurrent::ScheduledTask.new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:58
    │  ├─ - Class#new $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/object.rb:59
    │  │  └─ + Concurrent::ScheduledTask#initialize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:178
    │  └─   Concurrent::ScheduledTask#full_memory_barrier $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mri_object.rb:28
    └─ - Concurrent::ScheduledTask#execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:273
       ├─ + Concurrent::ScheduledTask#compare_and_set_state $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/concern/obligation.rb:174
       └─ - Concurrent::ScheduledTask#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:43
          ├─   Thread::Mutex#owned? $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:44
          └─ - Thread::Mutex#synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
             └─ - Concurrent::ScheduledTask#block in synchronize $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb:47
                └─ - Concurrent::ScheduledTask#block in execute $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:275
                   └─ + Concurrent::ScheduledTask#ns_schedule $GemPath1/gems/concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb:312

源码如下

每个ScheduledTask初始化时,都会创建

  • 一个TimerSet,默认引用Concurrent.global_timer_set,用于按时间排列任务并调取最近的任务来执行
  • 一个线程池,默认引用Concurrent.global_io_executor,用于执行任务
然后ScheduledTask#executeTimerSet#post_task(self)把自己丢进TimerSet

等到时间了,TimerSet就会执行ScheduledTask#process_task

# concurrent-ruby-1.1.10/lib/concurrent-ruby/concurrent/scheduled_task.rb
module Concurrent
  class ScheduledTask < IVar
    def initialize(delay, opts = {}, &task)
      raise ArgumentError.new('no block given') unless block_given?
      raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0

      super(NULL, opts, &nil)

      synchronize do
        ns_set_state(:unscheduled)
        @parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
        @args = get_arguments_from(opts)
        @delay = delay.to_f
        @task = task
        @time = nil
        @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
        self.observers = Collection::CopyOnNotifyObserverSet.new
      end
    end

    def process_task
      safe_execute(@task, @args)
    end

    def execute
      if compare_and_set_state(:pending, :unscheduled)
        synchronize{ ns_schedule(@delay) }
      end
      self
    end

    def ns_schedule(delay)
      @delay = delay
      @time = Concurrent.monotonic_time + @delay
      @parent.send(:post_task, self)
    end
  end
end

如何传递参数据呢?从上面initialize可见,参数会用get_arguments_from抽取

def get_arguments_from(opts = {})
  [*opts.fetch(:args, [])]
end

所以这样传参就可以了

Concurrent::ScheduledTask.execute(0, {args: [1,2,3]}) do |a, b, c|
  pp [a, b]
end
# [1, 2]