源码:rails/activejob · rails/rails

config配置

config/application.rbconfig/environments/{RAILS_ENV}.rb设置config.active_job.xxx = yyy,会延迟到active_job加载后设置到ActiveJob::Base

# activejob-7.0.8.7/lib/active_job/railtie.rb
initializer "active_job.set_configs" do |app|
  options = app.config.active_job
  # ...
  ActiveSupport.on_load(:active_job) do
    # ...
    options.each do  |k, v|
      k = "#{k}="
      send(k, v) if respond_to? k
    end
  end
  # ...
end

adapter适配器

ActiveJob::Base.queue_adapterclass_attribute,可以config.active_job.queue_adapter = xxx全局设置,也可以在子类上self.queue_adapter = :yyy单独设置

module ActiveJob
  # activejob-7.0.8.7/lib/active_job/base.rb
  class Base
    # ...
    include QueueAdapter
    # ...
  end

  # activejob-7.0.8.7/lib/active_job/queue_adapter.rb
  module QueueAdapter
    extend ActiveSupport::Concern

    included do
      class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
      delegate :queue_adapter, to: :class
      self.queue_adapter = :async
    end

    module ClassMethods
      def queue_adapter
        _queue_adapter
      end

      def queue_adapter=(name_or_adapter)
        case name_or_adapter
        when Symbol, String
          queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
          assign_adapter(name_or_adapter.to_s, queue_adapter)
        else
          if queue_adapter?(name_or_adapter)
            adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
            assign_adapter(adapter_name, name_or_adapter)
          else
            raise ArgumentError
          end
        end
      end
    end
  end
end

perform_later延迟执行

ActiveJob::Base.perform_later会创建一个ActiveJob::Base实例,并调用adapter的enqueueenqueue_at实例方法

# activejob-7.0.8.7/lib/active_job/enqueuing.rb
module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern

    module ClassMethods
      def perform_later(...)
        job = job_or_instantiate(...)
        enqueue_result = job.enqueue
        yield job if block_given?
        enqueue_result
      end

      private
        def job_or_instantiate(*args)
          args.first.is_a?(self) ? args.first : new(*args)
        end
    end

    def enqueue(options = {})
      set(options)
      self.successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          queue_adapter.enqueue_at self, scheduled_at
        else
          queue_adapter.enqueue self
        end

        self.successfully_enqueued = true
      rescue EnqueueError => e
        self.enqueue_error = e
      end

      if successfully_enqueued?
        self
      else
        false
      end
    end
  end
end

set参数

ActiveJob::Base.set会创建一个 ActiveJob::ConfiguredJob收纳waitwait_untilqueuepriority,等待后续调用perform_laterperform_now

module ActiveJob
  # activejob-7.0.8.7/lib/active_job/core.rb
  module Core
    extend ActiveSupport::Concern

    module ClassMethods
      def set(options = {})
        ConfiguredJob.new(self, options)
      end
    end

    def set(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      self
    end
  end

  # activejob-7.0.8.7/lib/active_job/configured_job.rb
  class ConfiguredJob
    def initialize(job_class, options = {})
      @options = options
      @job_class = job_class
    end

    def perform_now(...)
      @job_class.new(...).set(@options).perform_now
    end

    def perform_later(...)
      @job_class.new(...).enqueue @options
    end
  end
end


execute执行序列化的任务

将序列化的{job_id: '...', job_class: '...', arguments: '...' ...}恢复为具体ActiveJob::Base子类,并调用perform_now

(其他adapter入队时可以保存序列化参数,然后出队时使用这个ActiveJob::Base.execute去执行)


module ActiveJob
  # activejob-7.0.8.7/lib/active_job/execution.rb
  module Execution
    extend ActiveSupport::Concern

    module ClassMethods
      def execute(job_data)
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end
    end
  end

  # activejob-7.0.8.7/lib/active_job/core.rb
  module Core
    extend ActiveSupport::Concern

    module ClassMethods
      def deserialize(job_data)
        job = job_data["job_class"].constantize.new
        job.deserialize(job_data)
        job
      end
    end
  end
end