跟踪enqueue_in方法到底做了什么:

binding.trace_tree(htmp: 'resque_enqueue_in'){ Resque.enqueue_in 2.seconds, TryResque, 4,5,6 }

完整调用栈如下:

20180423_223027_764_resque_enqueue_in.html


可见,最终job会由delayed_push添加到一些用于调度的集合中


方法delayed_push的源码如下。它会先把任务参数encode,加入键名为时间点("delayed:#{timestamp.to_i}")的列表中,使用列表是以防同时间的任务被覆盖。然后,再加入键名为job参数("timestamps:#{encode(item)}")的set中,以记录同一个job会在哪些时间被执行(因为可以执行多次)。最后,将时间点作为键加入到有序集:delayed_queue_schedule之中(这里时间相同倒是无所谓,值与键相同也无所谓)。至此,任务注册完成。

resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb

# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object Insertion
# if O(log(n)).  Returns true if it's the first job to be scheduled at
# that time, else false
def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Store the timestamps at with this item occurs
  redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end


接着,便是任务提取了。跟resque一样,这就在rake中直接发起。它基本上就是循环地获取有序集:delayed_queue_schedule中当前时刻之前的时间最小的那个键

resque-scheduler-4.0.0/lib/resque/scheduler.rb

class << self
  def run
    procline 'Starting'

    # trap signals
    register_signal_handlers

    #...

    begin
      @th = Thread.current

      # Now start the scheduling part of the loop.
      loop do
        if master?
          begin
            handle_delayed_items
            update_schedule if dynamic
          rescue Errno::EAGAIN, Errno::ECONNRESET => e
            log! e.message
          end
        end
        poll_sleep
      end

    rescue Interrupt
      log 'Exiting'
    end
  end

  def handle_delayed_items(at_time = nil)
    timestamp = Resque.next_delayed_timestamp(at_time)
    if timestamp
      procline 'Processing Delayed Items'
      until timestamp.nil?
        enqueue_delayed_items_for_timestamp(timestamp)
        timestamp = Resque.next_delayed_timestamp(at_time)
      end
    end
  end


resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb

def next_delayed_timestamp(at_time = nil)
  search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end

def search_first_delayed_timestamp_in_range(start_at, stop_at)
  start_at = start_at.nil? ? '-inf' : start_at.to_i
  stop_at = stop_at.nil? ? '+inf' : stop_at.to_i

  items = redis.zrangebyscore(
    :delayed_queue_schedule, start_at, stop_at,
    limit: [0, 1]
  )
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end


然后以此时间去获取"delayed:#{timestamp.to_i}"中的任务参数,此方法会用同一个时间点去lpop列表中的任务参数,直到列表为空,才回到上级,去查另一个最小时间

resque-scheduler-4.0.0/lib/resque/scheduler.rb

def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  loop do
    handle_shutdown do
      # Continually check that it is still the master
      if master?
        item = Resque.next_item_for_timestamp(timestamp)
        if item
          log "queuing #{item['class']} [delayed]"
          handle_errors { enqueue_from_config(item) }
        end
      end
    end
    # continue processing until there are no more ready items in this
    # timestamp
    break if item.nil?
  end
end


resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  encoded_item = redis.lpop(key)
  redis.srem("timestamps:#{encoded_item}", key)
  item = decode(encoded_item)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end


接着再enqueue_from_config即可让转到resque的namespace下,让其接手