active_job.rb
config配置
在
config/application.rb
或config/environments/{RAILS_ENV}.rb
设置config.active_job.xxx = yyy
,会延迟到active_job加载后设置到ActiveJob::Base
上# activejob-7.0.8.7/lib/active_job/railtie.rb
initializer "active_job.set_configs" do |app|
options = app.config.active_job
# ...
ActiveSupport.on_load(:active_job) do
# ...
options.each do |k, v|
k = "#{k}="
send(k, v) if respond_to? k
end
end
# ...
end
adapter适配器
ActiveJob::Base.queue_adapter
是class_attribute
,可以config.active_job.queue_adapter = xxx
全局设置,也可以在子类上self.queue_adapter = :yyy
单独设置module ActiveJob
# activejob-7.0.8.7/lib/active_job/base.rb
class Base
# ...
include QueueAdapter
# ...
end
# activejob-7.0.8.7/lib/active_job/queue_adapter.rb
module QueueAdapter
extend ActiveSupport::Concern
included do
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
delegate :queue_adapter, to: :class
self.queue_adapter = :async
end
module ClassMethods
def queue_adapter
_queue_adapter
end
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
end
end
end
perform_later延迟执行
ActiveJob::Base.perform_later
会创建一个ActiveJob::Base
实例,并调用adapter的enqueue
或enqueue_at
实例方法# activejob-7.0.8.7/lib/active_job/enqueuing.rb
module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
module ClassMethods
def perform_later(...)
job = job_or_instantiate(...)
enqueue_result = job.enqueue
yield job if block_given?
enqueue_result
end
private
def job_or_instantiate(*args)
args.first.is_a?(self) ? args.first : new(*args)
end
end
def enqueue(options = {})
set(options)
self.successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
self.successfully_enqueued = true
rescue EnqueueError => e
self.enqueue_error = e
end
if successfully_enqueued?
self
else
false
end
end
end
end
set参数
ActiveJob::Base.set
会创建一个 ActiveJob::ConfiguredJob
收纳wait
、wait_until
、queue
、priority
,等待后续调用perform_later
、perform_now
module ActiveJob
# activejob-7.0.8.7/lib/active_job/core.rb
module Core
extend ActiveSupport::Concern
module ClassMethods
def set(options = {})
ConfiguredJob.new(self, options)
end
end
def set(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
self
end
end
# activejob-7.0.8.7/lib/active_job/configured_job.rb
class ConfiguredJob
def initialize(job_class, options = {})
@options = options
@job_class = job_class
end
def perform_now(...)
@job_class.new(...).set(@options).perform_now
end
def perform_later(...)
@job_class.new(...).enqueue @options
end
end
end
execute执行序列化的任务
将序列化的
{job_id: '...', job_class: '...', arguments: '...' ...}
恢复为具体ActiveJob::Base
子类,并调用perform_now
(其他adapter入队时可以保存序列化参数,然后出队时使用这个
ActiveJob::Base.execute
去执行)
module ActiveJob
# activejob-7.0.8.7/lib/active_job/execution.rb
module Execution
extend ActiveSupport::Concern
module ClassMethods
def execute(job_data)
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
end
end
# activejob-7.0.8.7/lib/active_job/core.rb
module Core
extend ActiveSupport::Concern
module ClassMethods
def deserialize(job_data)
job = job_data["job_class"].constantize.new
job.deserialize(job_data)
job
end
end
end
end