sidekiq中间件
配置方法
sidekiq中间件写法如下:
- 需要有实例方法
call
,接收Sidekiq::Job
实例、原始值(json)、队列名 - 并且里面视情况调用block去执行下一层的中间件
- 如需获取redis连接,还得
include Sidekiq::ServerMiddleware
或提供config=
方法
class MyServerHook
include Sidekiq::ServerMiddleware
def call(job_instance, msg, queue)
logger.info "Before job"
redis {|conn| conn.get("foo") } # do something in Redis
yield
logger.info "After job"
end
end
可以通过以下方法配置中间件
Sidekiq.configure_client do |config|
config.client_middleware do |chain|
chain.add MyClientHook
end
end
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add MyServerHook
chain.remove ActiveRecord
end
end
运作机制
上述的配置方法,会先获取sidekiq全局的
Sidekiq::Middleware::Chain
实例(有分@client_chain
和@server_chain
)# sidekiq-6.5.7/lib/sidekiq.rb
module Sidekiq
def self.configure_server
yield self if server?
end
def self.configure_client
yield self unless server?
end
def self.client_middleware
@client_chain ||= Middleware::Chain.new(self)
yield @client_chain if block_given?
@client_chain
end
def self.server_middleware
@server_chain ||= default_server_middleware
yield @server_chain if block_given?
@server_chain
end
end
而
Sidekiq::Middleware::Chain
本质上是一个Sidekiq::Middleware::Entry
数组。当中间件被invoke
时,它会执行第一个中间件的call
方法,并传递第二个中间进去,如此类推。直到最后一个中间件,它接收到的才是sidekiq任务的perform
方法# sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb
module Sidekiq
module Middleware
class Chain
include Enumerable
# Iterate through each middleware in the chain
def each(&block)
entries.each(&block)
end
# @api private
def initialize(config = nil) # :nodoc:
@config = config
@entries = nil
yield self if block_given?
end
def entries
@entries ||= []
end
# Remove all middleware matching the given Class
# @param klass [Class]
def remove(klass)
entries.delete_if { |entry| entry.klass == klass }
end
def add(klass, *args)
remove(klass)
entries << Entry.new(@config, klass, *args)
end
def retrieve
map(&:make_new)
end
def invoke(*args)
return yield if empty?
chain = retrieve
traverse_chain = proc do
if chain.empty?
yield
else
chain.shift.call(*args, &traverse_chain)
end
end
traverse_chain.call
end
end
end
end
每次中间件链被
invoke
,都会重新实例化每一个中间件,并且如果中间件有config=
方法,还会把sidekiq配置(内含redis)注入给它# sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb
module Sidekiq
module Middleware
class Chain
def retrieve
map(&:make_new)
end
end
class Entry
attr_reader :klass
def make_new
x = @klass.new(*@args)
x.config = @config if @config && x.respond_to?(:config=)
x
end
end
end
end
如果中间件想要取得sidekiq的redis连接,则它必须
include Sidekiq::ServerMiddleware
,或者提供config=
方法,这样sidekiq将中间件实例化时,才会将sidekiq配置(内含redis)注入给它# sidekiq-6.5.7/lib/sidekiq/middleware/modules.rb
module Sidekiq
# Server-side middleware must import this Module in order
# to get access to server resources during `call`.
module ServerMiddleware
attr_accessor :config
def redis_pool
config.redis_pool
end
def logger
config.logger
end
def redis(&block)
config.redis(&block)
end
end
# no difference for now
ClientMiddleware = ServerMiddleware
end