背景

最近做的一个项目里,需要一个异步任务功能。因为运维人手紧张,我不想另外部署一个sidekiq,只想随项目启动一个异步线程去执行任务,于是找到了这个[[good_job.rb]]……

简介

  • 跟sidekiq一样,都可作为ActiveJob底层任务队列,不同的是,sidekiq将任务存于redis,而good_job存于postgresql,而且执行完可以不删
  • 可以另起进程运行,也可与rails同一进程内运行,每启动一个进程会占用一个数据库连接
  • 依靠postgresql的LISTEN/NOTIFY功能及时接收任务,同时也会轮询数据库检查待执行的任务,使用advisory lock避免重复取任务
源码

如果是使用bundle exec good_job start另起进程的方式启动,则会由GoodJob::Cli#start处理

# good_job-2.7.3/lib/good_job/cli.rb
def start
  set_up_application!
  configuration = GoodJob::Configuration.new(options)

  Daemon.new(pidfile: configuration.pidfile).daemonize if configuration.daemonize?

  notifier = GoodJob::Notifier.new
  poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
  scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
  notifier.recipients << [scheduler, :create_thread]
  poller.recipients << [scheduler, :create_thread]

  cron_manager = GoodJob::CronManager.new(configuration.cron_entries, start_on_initialize: true) if configuration.enable_cron?

  if configuration.probe_port
    probe_server = GoodJob::ProbeServer.new(port: configuration.probe_port)
    probe_server.start
  end

  @stop_good_job_executable = false
  %w[INT TERM].each do |signal|
    trap(signal) { @stop_good_job_executable = true }
  end

  Kernel.loop do
    sleep 0.1
    break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
  end

  executors = [notifier, poller, cron_manager, scheduler].compact
  GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout)
  probe_server&.stop
end

如果是使用GOODJOBEXECUTIONMODE=async rails server与rails同一进程的方式启动,(或者在initializer文件指定:async),则会由GoodJob::Adapter#startasync处理

# good_job-2.7.3/lib/good_job/adapter.rb
def start_async
  return unless execute_async?

  @notifier = GoodJob::Notifier.new
  @poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
  @scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: true)
  @notifier.recipients << [@scheduler, :create_thread]
  @poller.recipients << [@scheduler, :create_thread]

  @cron_manager = GoodJob::CronManager.new(@configuration.cron_entries, start_on_initialize: true) if @configuration.enable_cron?

  @_async_started = true
end

从上面代码可发现,不管怎样启动,它都会创建出Notifier、Poller、Scheduler,下面来逐一分析它们

每个goodjob进程都会创建一个Notifier,其本质上是一个线程。该线程会占用一个数据库连接,异步调用postgresql的LISTEN命令,订阅一个名为goodjob的channel。然后每当收到消息时,传递给@recipients,也就是Scheduler#create_thread

# good_job-2.7.3/lib/good_job/notifier.rb
def listen(delay: 0)
  future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
    with_listen_connection do |conn|
      ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
        conn.async_exec("LISTEN #{CHANNEL}").clear
      end

      ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
        thr_listening.make_true
        while thr_executor.running?
          conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
            next unless channel == CHANNEL

            ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
            parsed_payload = JSON.parse(payload, symbolize_names: true)
            thr_recipients.each do |recipient|
              target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
              target.send(method_name, parsed_payload)
            end
          end
        end
      end
    ensure
      thr_listening.make_false
      ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
        conn.async_exec("UNLISTEN *").clear
      end
    end
  end

  future.add_observer(self, :listen_observer)
  future.execute
end

每个good_job进程启动时,都可以为每个队列单独指定线程数,即是每个队列对应一个Scheduler,各有一个线程池,然后包装成一个MultiScheduler。

如无特别指定,则所有队列均由一个Scheduler调度,共享一个线程池

每个Scheduler接收到任务后,会先检查任务是否自己所绑定的队列,如果是,才包装成Concurrent::ScheduledTask丢进线程池里

线程池是一个[[concurrent-ruby Thread Pools]],对于延时任务,会放进一个Concurrent::TimerSet最小堆里排队

# good_job-2.7.3/lib/good_job/scheduler.rb
def self.from_configuration(configuration, warm_cache_on_initialize: false)
  schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
    queue_string, max_threads = queue_string_and_max_threads.split(':')
    max_threads = (max_threads || configuration.max_threads).to_i

    job_performer = GoodJob::JobPerformer.new(queue_string)
    GoodJob::Scheduler.new(
      job_performer,
      max_threads: max_threads,
      max_cache: configuration.max_cache,
      warm_cache_on_initialize: warm_cache_on_initialize
    )
  end

  if schedulers.size > 1
    GoodJob::MultiScheduler.new(schedulers)
  else
    schedulers.first
  end
end

def create_thread(state = nil)
  return nil unless executor.running?

  if state
    return false unless performer.next?(state)

    if state[:scheduled_at]
      scheduled_at = if state[:scheduled_at].is_a? String
                       Time.zone.parse state[:scheduled_at]
                     else
                       state[:scheduled_at]
                     end
      delay = [(scheduled_at - Time.current).to_f, 0].max
    end
  end

  delay ||= 0
  run_now = delay <= 0.01
  if run_now
    return nil unless executor.ready_worker_count.positive?
  elsif @max_cache.positive?
    return nil unless remaining_cache_count.positive?
  end

  create_task(delay)

  run_now ? true : nil
end

def create_task(delay = 0)
  future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
    Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
    Rails.application.reloader.wrap do
      thr_performer.next
    end
  end
  future.add_observer(self, :task_observer)
  future.execute
end

当任务到达预定执行时间后,就会触发Scheduler对应的那个JobPerformer#next

# good_job-2.7.3/lib/good_job/job_performer.rb
def next
  job_query.perform_with_advisory_lock
end

def next?(state = {})
  return true unless state[:queue_name]

  if parsed_queues[:exclude]
    parsed_queues[:exclude].exclude?(state[:queue_name])
  elsif parsed_queues[:include]
    parsed_queues[:include].include?(state[:queue_name])
  else
    true
  end
end

def job_query
  @_job_query ||= GoodJob::Execution.queue_string(queue_string)
end

而JobPerformer#next会运行两条SQL,查询GoodJob::Execution表中保存的任务,尝试对最应该现在执行的任务加咨询锁,并检查有否加锁成功。若成功,则将其反序列化去执行

(postgresql咨询锁与其他FOR UPDATE加锁方式的区别在于,FOR UPDATE只能锁定表中存在的记录,并且会有阻塞的可能,而咨询锁是把锁信息记录于系统表中,并且允许加锁不成则立即返回)

# good_job-2.7.3/lib/good_job/execution.rb
def self.perform_with_advisory_lock
  unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |executions|
    execution = executions.first
    break if execution.blank?
    break :unlocked unless execution&.executable?

    execution.perform
  end
end

剩下就是Poller了。每个good_job进程都会有这样一个周期任务,它使用Concurrent::TimerTask实现,任务内容与Notifier一样,从数据库中取最应该现在执行的任务来运行

因为Notifier接到任务通知后,内存中不一定能存放所有延时任务,而且进程重启后也需要捞出之前未执行的过期任务,又或者任务失败后需要重试,这些情况下LISTEN/NOTIFY都帮不上什么忙,所以我们需要这样一个周期性的动作

# good_job-2.7.3/lib/good_job/poller.rb
def create_timer
  return if @timer_options[:execution_interval] <= 0

  @timer = Concurrent::TimerTask.new(@timer_options) do
    recipients.each do |recipient|
      target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
      target.send(method_name)
    end
  end
  @timer.add_observer(self, :timer_observer)
  @timer.execute
end