跟踪一下

[4] pry(main)> binding.trace_tree(htmp: 'redis/pipeline') do
[4] pry(main)*   $redis.pipelined do  
[4] pry(main)*     $redis.set 'i', 1    
[4] pry(main)*     $redis.set 'j', 2    
[4] pry(main)*     $redis.set 'k', 3    
[4] pry(main)*   end    
[4] pry(main)* end  
=> ["OK", "OK", "OK"]


得调用栈如下

20171015_172152_460_pipeline.html

首先,pipelined方法会将内部的@client替换成Pipeline对象,然后再执行要批处理的操作

def pipelined
  synchronize do |client|
    begin
      original, @client = @client, Pipeline.new
      yield(self)
      original.call_pipeline(@client)
    ensure
      @client = original
    end
  end
end


此部分调用如下


Pipeline跟Client一样都响应call方法

def set(key, value, options = {})
  args = []

  ex = options[:ex]
  args.concat(["EX", ex]) if ex

  px = options[:px]
  args.concat(["PX", px]) if px

  nx = options[:nx]
  args.concat(["NX"]) if nx

  xx = options[:xx]
  args.concat(["XX"]) if xx

  synchronize do |client|
    if nx || xx
      client.call([:set, key, value.to_s] + args, &BoolifySet)
    else
      client.call([:set, key, value.to_s] + args)
    end
  end
end


不同的是,Client是直接“处理”,而Pipeline是将每个命令暂存到一个个Future对象中

# lib/redis/client.rb
def call(command)
  reply = process([command]) { read }
  raise reply if reply.is_a?(CommandError)

  if block_given?
    yield reply
  else
    reply
  end
end

# lib/redis/pipeline.rb
def call(command, &block)                                                                                                                                                                                                           
  # A pipeline that contains a shutdown should not raise ECONNRESET when
  # the connection is gone.
  @shutdown = true if command.first == :shutdown
  future = Future.new(command, block)
  @futures << future
  future
end


接着,再由pipeline.finish(call_pipelined(pipeline.commands))实际完成

def call_pipeline(pipeline)
  with_reconnect pipeline.with_reconnect? do
    begin
      pipeline.finish(call_pipelined(pipeline.commands)).tap do
        self.db = pipeline.db if pipeline.db
      end
    rescue ConnectionError => e
      return nil if pipeline.shutdown?
      # Assume the pipeline was sent in one piece, but execution of
      # SHUTDOWN caused none of the replies for commands that were executed
      # prior to it from coming back around.
      raise e
    end
  end
end


从源码来看,process循环将命令发送出去(write),然后call_pipelined会根据命令数量,循环读取结果(read),然后将收集的结果(result),对应填到每个Future中,因而在pipeline的block中可以每行命令赋值一个变量,以单独获取每条命令的结果

# lib/redis/client.rb
def call_pipelined(commands)
  return [] if commands.empty?

  # The method #ensure_connected (called from #process) reconnects once on
  # I/O errors. To make an effort in making sure that commands are not
  # executed more than once, only allow reconnection before the first reply
  # has been read. When an error occurs after the first reply has been
  # read, retrying would re-execute the entire pipeline, thus re-issuing
  # already successfully executed commands. To circumvent this, don't retry
  # after the first reply has been read successfully.

  result = Array.new(commands.size)
  reconnect = @reconnect

  begin
    exception = nil

    process(commands) do
      result[0] = read

      @reconnect = false

      (commands.size - 1).times do |i|
        reply = read
        result[i + 1] = reply
        exception = reply if exception.nil? && reply.is_a?(CommandError)
      end
    end

    raise exception if exception
  ensure
    @reconnect = reconnect
  end

  result
end

def process(commands)
  logging(commands) do
    ensure_connected do
      commands.each do |command|
        if command_map[command.first]
          command = command.dup
          command[0] = command_map[command.first]
        end

        write(command)
      end

      yield if block_given?
    end
  end
end

# lib/redis/pipeline.rb
def finish(replies, &blk)
  if blk
    futures.each_with_index.map do |future, i|
      future._set(blk.call(replies[i]))
    end
  else
    futures.each_with_index.map do |future, i|
      future._set(replies[i])
    end
  end
end


可以对比一下,与Pipeline不同,普通的Client因为只会一次write一条命令,所以也只会read一次

# lib/redis/client.rb
def call(command)
  reply = process([command]) { read }
  raise reply if reply.is_a?(CommandError)

  if block_given?
    yield reply
  else
    reply
  end
end


这样看来,多条命令,不管是pipeline还是不pipeline,都一样要write/read同样次数,那pipeline怎么会更加高效呢?

首先,从上面看到,write跟read不必是连续的,write只返回发送字数,不需等待redis处理。其次,一次read所产生的网络IO是有可能把多个命令的结果都返回的。如源码所示,一次gets,读取1024字节,如果没读完一个命令的结果(根据RESP,每个结果以\r\n结尾),则继续请求(_read_from_socket)。如果1024字节中已包含了多个结果,则下次read进入gets时,直接slice!取出下一段结果就可以了

# lib/redis/connection/ruby.rb
def gets
  crlf = nil

  while (crlf = @buffer.index(CRLF)) == nil
    @buffer << _read_from_socket(1024)
  end

  @buffer.slice!(0, crlf + CRLF.bytesize)
end

def _read_from_socket(nbytes)

  begin
    read_nonblock(nbytes)

  rescue #...
  end

rescue EOFError
  raise Errno::ECONNRESET
end


而例子中三个set,结果返回三个OK,是不足1024字节的。因此,调用栈有三次gets,但只有一次_read_from_socket。这样当然比非pipeline的write完马上read要高效