为检查Resque是如何执行队列里的job的,加入调试代码如下

class TryResque
  @queue = :just_try_resque

  def self.perform *args
    puts args
    puts caller
  end

  def self.enqueue *args
    Resque.enqueue self, *args
  end
end


然后起job

$ QUEUE=just_try_resque bundle exec rake resque:work


入队

[1] pry(main)> TryResque.enqueue 1,2,3


可见终端输出

1
2
3
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/job.rb:227:in `perform'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:250:in `perform'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:189:in `block in work'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:166:in `loop'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/worker.rb:166:in `work'
/home/z/.rbenv/versions/2.1.5/lib/ruby/gems/2.1.0/gems/resque-1.25.2/lib/resque/tasks.rb:41:in `block (2 levels) in <top (required)="">'
...</top>


起job任务如下

# resque-1.25.2/lib/resque/lib/resque/tasks.rb
desc "Start a Resque worker"
task :work => [ :preload, :setup ] do
  require 'resque'

  queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',')

  begin
    worker = Resque::Worker.new(*queues)
    if ENV['LOGGING'] || ENV['VERBOSE']
      worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
    end
    if ENV['VVERBOSE']
      worker.very_verbose = ENV['VVERBOSE']
    end
    worker.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0
    worker.term_child = ENV['TERM_CHILD']
    worker.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']
  rescue Resque::NoQueueError
    abort "set QUEUE env var, e.g. $ QUEUE=critical,high rake resque:work"
  end

  if ENV['BACKGROUND']
    unless Process.respond_to?('daemon')
        abort "env var BACKGROUND is set, which requires ruby >= 1.9"
    end
    Process.daemon(true, true)
  end

  if ENV['PIDFILE']
    File.open(ENV['PIDFILE'], 'w') { |f| f << worker.pid }
  end

  worker.log "Starting worker #{worker}"

  worker.work(ENV['INTERVAL'] || 5) # interval, will block
end


方法work如下(resque-1.25.2/lib/resque/worker.rb),就是一个循环从队列取job,然后执行的流程

# The following events occur during a worker's life cycle:
#
# 1. Startup:   Signals are registered, dead workers are pruned,
#               and this worker is registered.
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown:  This worker is unregistered.
def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  startup

  loop do
    break if shutdown?

    if not paused? and job = reserve
      log "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
      if @child = fork(job)
        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
      else
        unregister_signal_handlers if will_fork? && term_child
        begin

          reconnect
          perform(job, &block)

        rescue Exception => exception
          report_failed_job(job,exception)
        end

        if will_fork?
          run_at_exit_hooks ? exit : exit!
        end
      end

      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
rescue Exception => exception
  unless exception.class == SystemExit && !@child && run_at_exit_hooks
    log "Failed to start worker : #{exception.inspect}"

    unregister_worker(exception)
  end
end


关于shutdown,查找代码,可见在work方法中调用startup时,就注册了信号

# resque-1.25.2/lib/resque/worker.rb
def startup
  Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING
  enable_gc_optimizations
  register_signal_handlers
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }371
  #...
end

def shutdown
  log 'Exiting...'
  @shutdown = true
end

# Kill the child and shutdown immediately.
# If not forking, abort this process.
def shutdown!
  shutdown
  # ...
end

# Should this worker shutdown as soon as current job is finished?
def shutdown?
  @shutdown
end


而队列是有namespace名为queue的。(当然,在设置Resque.redis=时,可以自己再包多一层namespace)

# resque-1.25.2/lib/resque.rb
def push(queue, item)
  redis.pipelined do
    watch_queue(queue)
    redis.rpush "queue:#{queue}", encode(item)
  end
end

def pop(queue)
  decode redis.lpop("queue:#{queue}")
end