配置方法

sidekiq中间件写法如下:

  • 需要有实例方法call,接收Sidekiq::Job实例、原始值(json)、队列名
  • 并且里面视情况调用block去执行下一层的中间件
  • 如需获取redis连接,还得include Sidekiq::ServerMiddleware或提供config=方法
class MyServerHook
  include Sidekiq::ServerMiddleware

  def call(job_instance, msg, queue)
    logger.info "Before job"
    redis {|conn| conn.get("foo") } # do something in Redis
    yield
    logger.info "After job"
  end
end

可以通过以下方法配置中间件

Sidekiq.configure_client do |config|
  config.client_middleware do |chain|
    chain.add MyClientHook
  end
end

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add MyServerHook
    chain.remove ActiveRecord
  end
end

运作机制

上述的配置方法,会先获取sidekiq全局的Sidekiq::Middleware::Chain实例(有分@client_chain@server_chain

# sidekiq-6.5.7/lib/sidekiq.rb
module Sidekiq
  def self.configure_server
    yield self if server?
  end

  def self.configure_client
    yield self unless server?
  end

  def self.client_middleware
    @client_chain ||= Middleware::Chain.new(self)
    yield @client_chain if block_given?
    @client_chain
  end

  def self.server_middleware
    @server_chain ||= default_server_middleware
    yield @server_chain if block_given?
    @server_chain
  end
end

Sidekiq::Middleware::Chain本质上是一个Sidekiq::Middleware::Entry数组。当中间件被invoke时,它会执行第一个中间件的call方法,并传递第二个中间进去,如此类推。直到最后一个中间件,它接收到的才是sidekiq任务的perform方法

# sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb
module Sidekiq
  module Middleware
    class Chain
      include Enumerable

      # Iterate through each middleware in the chain
      def each(&block)
        entries.each(&block)
      end

      # @api private
      def initialize(config = nil) # :nodoc:
        @config = config
        @entries = nil
        yield self if block_given?
      end

      def entries
        @entries ||= []
      end

      # Remove all middleware matching the given Class
      # @param klass [Class]
      def remove(klass)
        entries.delete_if { |entry| entry.klass == klass }
      end

      def add(klass, *args)
        remove(klass)
        entries << Entry.new(@config, klass, *args)
      end

      def retrieve
        map(&:make_new)
      end

      def invoke(*args)
        return yield if empty?

        chain = retrieve
        traverse_chain = proc do
          if chain.empty?
            yield
          else
            chain.shift.call(*args, &traverse_chain)
          end
        end
        traverse_chain.call
      end
    end
  end
end

每次中间件链被invoke,都会重新实例化每一个中间件,并且如果中间件有config=方法,还会把sidekiq配置(内含redis)注入给它

# sidekiq-6.5.7/lib/sidekiq/middleware/chain.rb
module Sidekiq
  module Middleware
    class Chain
      def retrieve
        map(&:make_new)
      end
    end

    class Entry
      attr_reader :klass

      def make_new
        x = @klass.new(*@args)
        x.config = @config if @config && x.respond_to?(:config=)
        x
      end
    end
  end
end

如果中间件想要取得sidekiq的redis连接,则它必须include Sidekiq::ServerMiddleware,或者提供config=方法,这样sidekiq将中间件实例化时,才会将sidekiq配置(内含redis)注入给它

# sidekiq-6.5.7/lib/sidekiq/middleware/modules.rb
module Sidekiq
  # Server-side middleware must import this Module in order
  # to get access to server resources during `call`.
  module ServerMiddleware
    attr_accessor :config
    def redis_pool
      config.redis_pool
    end

    def logger
      config.logger
    end

    def redis(&block)
      config.redis(&block)
    end
  end

  # no difference for now
  ClientMiddleware = ServerMiddleware
end