试试good_job
背景
最近做的一个项目里,需要一个异步任务功能。因为运维人手紧张,我不想另外部署一个sidekiq,只想随项目启动一个异步线程去执行任务,于是找到了这个[[good_job.rb]]……
简介
- 跟sidekiq一样,都可作为ActiveJob底层任务队列,不同的是,sidekiq将任务存于redis,而good_job存于postgresql,而且执行完可以不删
- 可以另起进程运行,也可与rails同一进程内运行,每启动一个进程会占用一个数据库连接
- 依靠postgresql的LISTEN/NOTIFY功能及时接收任务,同时也会轮询数据库检查待执行的任务,使用advisory lock避免重复取任务
源码
如果是使用bundle exec good_job start另起进程的方式启动,则会由GoodJob::Cli#start处理
# good_job-2.7.3/lib/good_job/cli.rb
def start
set_up_application!
configuration = GoodJob::Configuration.new(options)
Daemon.new(pidfile: configuration.pidfile).daemonize if configuration.daemonize?
notifier = GoodJob::Notifier.new
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]
cron_manager = GoodJob::CronManager.new(configuration.cron_entries, start_on_initialize: true) if configuration.enable_cron?
if configuration.probe_port
probe_server = GoodJob::ProbeServer.new(port: configuration.probe_port)
probe_server.start
end
@stop_good_job_executable = false
%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
end
Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
end
executors = [notifier, poller, cron_manager, scheduler].compact
GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout)
probe_server&.stop
end
如果是使用GOODJOBEXECUTIONMODE=async rails server与rails同一进程的方式启动,(或者在initializer文件指定:async),则会由GoodJob::Adapter#startasync处理
# good_job-2.7.3/lib/good_job/adapter.rb
def start_async
return unless execute_async?
@notifier = GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
@cron_manager = GoodJob::CronManager.new(@configuration.cron_entries, start_on_initialize: true) if @configuration.enable_cron?
@_async_started = true
end
从上面代码可发现,不管怎样启动,它都会创建出Notifier、Poller、Scheduler,下面来逐一分析它们
每个goodjob进程都会创建一个Notifier,其本质上是一个线程。该线程会占用一个数据库连接,异步调用postgresql的LISTEN命令,订阅一个名为goodjob的channel。然后每当收到消息时,传递给@recipients,也就是Scheduler#create_thread
# good_job-2.7.3/lib/good_job/notifier.rb
def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
thr_listening.make_true
while thr_executor.running?
conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
next unless channel == CHANNEL
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
end
end
ensure
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
conn.async_exec("UNLISTEN *").clear
end
end
end
future.add_observer(self, :listen_observer)
future.execute
end
每个good_job进程启动时,都可以为每个队列单独指定线程数,即是每个队列对应一个Scheduler,各有一个线程池,然后包装成一个MultiScheduler。
如无特别指定,则所有队列均由一个Scheduler调度,共享一个线程池
每个Scheduler接收到任务后,会先检查任务是否自己所绑定的队列,如果是,才包装成Concurrent::ScheduledTask丢进线程池里
线程池是一个[[concurrent-ruby Thread Pools]],对于延时任务,会放进一个Concurrent::TimerSet最小堆里排队
# good_job-2.7.3/lib/good_job/scheduler.rb
def self.from_configuration(configuration, warm_cache_on_initialize: false)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
job_performer = GoodJob::JobPerformer.new(queue_string)
GoodJob::Scheduler.new(
job_performer,
max_threads: max_threads,
max_cache: configuration.max_cache,
warm_cache_on_initialize: warm_cache_on_initialize
)
end
if schedulers.size > 1
GoodJob::MultiScheduler.new(schedulers)
else
schedulers.first
end
end
def create_thread(state = nil)
return nil unless executor.running?
if state
return false unless performer.next?(state)
if state[:scheduled_at]
scheduled_at = if state[:scheduled_at].is_a? String
Time.zone.parse state[:scheduled_at]
else
state[:scheduled_at]
end
delay = [(scheduled_at - Time.current).to_f, 0].max
end
end
delay ||= 0
run_now = delay <= 0.01
if run_now
return nil unless executor.ready_worker_count.positive?
elsif @max_cache.positive?
return nil unless remaining_cache_count.positive?
end
create_task(delay)
run_now ? true : nil
end
def create_task(delay = 0)
future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Rails.application.reloader.wrap do
thr_performer.next
end
end
future.add_observer(self, :task_observer)
future.execute
end
当任务到达预定执行时间后,就会触发Scheduler对应的那个JobPerformer#next
# good_job-2.7.3/lib/good_job/job_performer.rb
def next
job_query.perform_with_advisory_lock
end
def next?(state = {})
return true unless state[:queue_name]
if parsed_queues[:exclude]
parsed_queues[:exclude].exclude?(state[:queue_name])
elsif parsed_queues[:include]
parsed_queues[:include].include?(state[:queue_name])
else
true
end
end
def job_query
@_job_query ||= GoodJob::Execution.queue_string(queue_string)
end
而JobPerformer#next会运行两条SQL,查询GoodJob::Execution表中保存的任务,尝试对最应该现在执行的任务加咨询锁,并检查有否加锁成功。若成功,则将其反序列化去执行
(postgresql咨询锁与其他FOR UPDATE加锁方式的区别在于,FOR UPDATE只能锁定表中存在的记录,并且会有阻塞的可能,而咨询锁是把锁信息记录于系统表中,并且允许加锁不成则立即返回)
# good_job-2.7.3/lib/good_job/execution.rb
def self.perform_with_advisory_lock
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |executions|
execution = executions.first
break if execution.blank?
break :unlocked unless execution&.executable?
execution.perform
end
end
剩下就是Poller了。每个good_job进程都会有这样一个周期任务,它使用Concurrent::TimerTask实现,任务内容与Notifier一样,从数据库中取最应该现在执行的任务来运行
因为Notifier接到任务通知后,内存中不一定能存放所有延时任务,而且进程重启后也需要捞出之前未执行的过期任务,又或者任务失败后需要重试,这些情况下LISTEN/NOTIFY都帮不上什么忙,所以我们需要这样一个周期性的动作
# good_job-2.7.3/lib/good_job/poller.rb
def create_timer
return if @timer_options[:execution_interval] <= 0
@timer = Concurrent::TimerTask.new(@timer_options) do
recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name)
end
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end