resque的运作
为检查Resque是如何执行队列里的job的,加入调试代码如下
class TryResque
@queue = :just_try_resque
def self.perform *args
puts args
puts caller
end
def self.enqueue *args
Resque.enqueue self, *args
end
end
然后起job
$ QUEUE=just_try_resque bundle exec rake resque:work
入队
[1] pry(main)> TryResque.enqueue 1,2,3
可见终端输出
1
2
3
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/job.rb:227:in `perform'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:250:in `perform'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:189:in `block in work'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:166:in `loop'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:166:in `work'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/tasks.rb:41:in `block (2 levels) in <top (required)="">'
...</top>
起job任务如下
# resque-1.25.2/lib/resque/lib/resque/tasks.rb
desc "Start a Resque worker"
task :work => [ :preload, :setup ] do
require 'resque'
queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',')
begin
worker = Resque::Worker.new(*queues)
if ENV['LOGGING'] || ENV['VERBOSE']
worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
end
if ENV['VVERBOSE']
worker.very_verbose = ENV['VVERBOSE']
end
worker.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0
worker.term_child = ENV['TERM_CHILD']
worker.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']
rescue Resque::NoQueueError
abort "set QUEUE env var, e.g. $ QUEUE=critical,high rake resque:work"
end
if ENV['BACKGROUND']
unless Process.respond_to?('daemon')
abort "env var BACKGROUND is set, which requires ruby >= 1.9"
end
Process.daemon(true, true)
end
if ENV['PIDFILE']
File.open(ENV['PIDFILE'], 'w') { |f| f << worker.pid }
end
worker.log "Starting worker #{worker}"
worker.work(ENV['INTERVAL'] || 5) # interval, will block
end
方法work如下(resque-1.25.2/lib/resque/worker.rb),就是一个循环从队列取job,然后执行的流程
# The following events occur during a worker's life cycle:
#
# 1. Startup: Signals are registered, dead workers are pruned,
# and this worker is registered.
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown: This worker is unregistered.
def work(interval = 5.0, &block)
interval = Float(interval)
$0 = "resque: Starting"
startup
loop do
break if shutdown?
if not paused? and job = reserve
log "got: #{job.inspect}"
job.worker = self
working_on job
procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
if @child = fork(job)
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
else
unregister_signal_handlers if will_fork? && term_child
begin
reconnect
perform(job, &block)
rescue Exception => exception
report_failed_job(job,exception)
end
if will_fork?
run_at_exit_hooks ? exit : exit!
end
end
done_working
@child = nil
else
break if interval.zero?
log! "Sleeping for #{interval} seconds"
procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
sleep interval
end
end
unregister_worker
rescue Exception => exception
unless exception.class == SystemExit && !@child && run_at_exit_hooks
log "Failed to start worker : #{exception.inspect}"
unregister_worker(exception)
end
end
关于shutdown,查找代码,可见在work方法中调用startup时,就注册了信号
# resque-1.25.2/lib/resque/worker.rb
def startup
Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING
enable_gc_optimizations
register_signal_handlers
prune_dead_workers
run_hook :before_first_fork
register_worker
# Fix buffering so we can `rake resque:work > resque.log` and
# get output from the child in there.
$stdout.sync = true
end
def register_signal_handlers
trap('TERM') { shutdown! }
trap('INT') { shutdown! }371
#...
end
def shutdown
log 'Exiting...'
@shutdown = true
end
# Kill the child and shutdown immediately.
# If not forking, abort this process.
def shutdown!
shutdown
# ...
end
# Should this worker shutdown as soon as current job is finished?
def shutdown?
@shutdown
end
而队列是有namespace名为queue的。(当然,在设置Resque.redis=时,可以自己再包多一层namespace)
# resque-1.25.2/lib/resque.rb
def push(queue, item)
redis.pipelined do
watch_queue(queue)
redis.rpush "queue:#{queue}", encode(item)
end
end
def pop(queue)
decode redis.lpop("queue:#{queue}")
end