resque-scheduler的运作
跟踪enqueue_in方法到底做了什么:
binding.trace_tree(htmp: 'resque_enqueue_in'){ Resque.enqueue_in 2.seconds, TryResque, 4,5,6 }
完整调用栈如下:
可见,最终job会由delayed_push添加到一些用于调度的集合中

方法delayed_push的源码如下。它会先把任务参数encode,加入键名为时间点("delayed:#{timestamp.to_i}")的列表中,使用列表是以防同时间的任务被覆盖。然后,再加入键名为job参数("timestamps:#{encode(item)}")的set中,以记录同一个job会在哪些时间被执行(因为可以执行多次)。最后,将时间点作为键加入到有序集:delayed_queue_schedule之中(这里时间相同倒是无所谓,值与键相同也无所谓)。至此,任务注册完成。
resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb
# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object Insertion
# if O(log(n)). Returns true if it's the first job to be scheduled at
# that time, else false
def delayed_push(timestamp, item)
# First add this item to the list for this timestamp
redis.rpush("delayed:#{timestamp.to_i}", encode(item))
# Store the timestamps at with this item occurs
redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")
# Now, add this timestamp to the zsets. The score and the value are
# the same since we'll be querying by timestamp, and we don't have
# anything else to store.
redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end
接着,便是任务提取了。跟resque一样,这就在rake中直接发起。它基本上就是循环地获取有序集:delayed_queue_schedule中当前时刻之前的时间最小的那个键
resque-scheduler-4.0.0/lib/resque/scheduler.rb
class << self
def run
procline 'Starting'
# trap signals
register_signal_handlers
#...
begin
@th = Thread.current
# Now start the scheduling part of the loop.
loop do
if master?
begin
handle_delayed_items
update_schedule if dynamic
rescue Errno::EAGAIN, Errno::ECONNRESET => e
log! e.message
end
end
poll_sleep
end
rescue Interrupt
log 'Exiting'
end
end
def handle_delayed_items(at_time = nil)
timestamp = Resque.next_delayed_timestamp(at_time)
if timestamp
procline 'Processing Delayed Items'
until timestamp.nil?
enqueue_delayed_items_for_timestamp(timestamp)
timestamp = Resque.next_delayed_timestamp(at_time)
end
end
end
resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb
def next_delayed_timestamp(at_time = nil)
search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end
def search_first_delayed_timestamp_in_range(start_at, stop_at)
start_at = start_at.nil? ? '-inf' : start_at.to_i
stop_at = stop_at.nil? ? '+inf' : stop_at.to_i
items = redis.zrangebyscore(
:delayed_queue_schedule, start_at, stop_at,
limit: [0, 1]
)
timestamp = items.nil? ? nil : Array(items).first
timestamp.to_i unless timestamp.nil?
end
然后以此时间去获取"delayed:#{timestamp.to_i}"中的任务参数,此方法会用同一个时间点去lpop列表中的任务参数,直到列表为空,才回到上级,去查另一个最小时间
resque-scheduler-4.0.0/lib/resque/scheduler.rb
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
loop do
handle_shutdown do
# Continually check that it is still the master
if master?
item = Resque.next_item_for_timestamp(timestamp)
if item
log "queuing #{item['class']} [delayed]"
handle_errors { enqueue_from_config(item) }
end
end
end
# continue processing until there are no more ready items in this
# timestamp
break if item.nil?
end
end
resque-scheduler-4.0.0/lib/resque/scheduler/delaying_extensions.rb
def next_item_for_timestamp(timestamp)
key = "delayed:#{timestamp.to_i}"
encoded_item = redis.lpop(key)
redis.srem("timestamps:#{encoded_item}", key)
item = decode(encoded_item)
# If the list is empty, remove it.
clean_up_timestamp(key, timestamp)
item
end
接着再enqueue_from_config即可让转到resque的namespace下,让其接手