redis-rb的pipeline实现
跟踪一下
[4] pry(main)> binding.trace_tree(htmp: 'redis/pipeline') do
[4] pry(main)* $redis.pipelined do
[4] pry(main)* $redis.set 'i', 1
[4] pry(main)* $redis.set 'j', 2
[4] pry(main)* $redis.set 'k', 3
[4] pry(main)* end
[4] pry(main)* end
=> ["OK", "OK", "OK"]
得调用栈如下
首先,pipelined方法会将内部的@client替换成Pipeline对象,然后再执行要批处理的操作
def pipelined
synchronize do |client|
begin
original, @client = @client, Pipeline.new
yield(self)
original.call_pipeline(@client)
ensure
@client = original
end
end
end
此部分调用如下

Pipeline跟Client一样都响应call方法
def set(key, value, options = {})
args = []
ex = options[:ex]
args.concat(["EX", ex]) if ex
px = options[:px]
args.concat(["PX", px]) if px
nx = options[:nx]
args.concat(["NX"]) if nx
xx = options[:xx]
args.concat(["XX"]) if xx
synchronize do |client|
if nx || xx
client.call([:set, key, value.to_s] + args, &BoolifySet)
else
client.call([:set, key, value.to_s] + args)
end
end
end
不同的是,Client是直接“处理”,而Pipeline是将每个命令暂存到一个个Future对象中
# lib/redis/client.rb
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given?
yield reply
else
reply
end
end
# lib/redis/pipeline.rb
def call(command, &block)
# A pipeline that contains a shutdown should not raise ECONNRESET when
# the connection is gone.
@shutdown = true if command.first == :shutdown
future = Future.new(command, block)
@futures << future
future
end
接着,再由pipeline.finish(call_pipelined(pipeline.commands))实际完成
def call_pipeline(pipeline)
with_reconnect pipeline.with_reconnect? do
begin
pipeline.finish(call_pipelined(pipeline.commands)).tap do
self.db = pipeline.db if pipeline.db
end
rescue ConnectionError => e
return nil if pipeline.shutdown?
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
raise e
end
end
end
从源码来看,process循环将命令发送出去(write),然后call_pipelined会根据命令数量,循环读取结果(read),然后将收集的结果(result),对应填到每个Future中,因而在pipeline的block中可以每行命令赋值一个变量,以单独获取每条命令的结果
# lib/redis/client.rb
def call_pipelined(commands)
return [] if commands.empty?
# The method #ensure_connected (called from #process) reconnects once on
# I/O errors. To make an effort in making sure that commands are not
# executed more than once, only allow reconnection before the first reply
# has been read. When an error occurs after the first reply has been
# read, retrying would re-execute the entire pipeline, thus re-issuing
# already successfully executed commands. To circumvent this, don't retry
# after the first reply has been read successfully.
result = Array.new(commands.size)
reconnect = @reconnect
begin
exception = nil
process(commands) do
result[0] = read
@reconnect = false
(commands.size - 1).times do |i|
reply = read
result[i + 1] = reply
exception = reply if exception.nil? && reply.is_a?(CommandError)
end
end
raise exception if exception
ensure
@reconnect = reconnect
end
result
end
def process(commands)
logging(commands) do
ensure_connected do
commands.each do |command|
if command_map[command.first]
command = command.dup
command[0] = command_map[command.first]
end
write(command)
end
yield if block_given?
end
end
end
# lib/redis/pipeline.rb
def finish(replies, &blk)
if blk
futures.each_with_index.map do |future, i|
future._set(blk.call(replies[i]))
end
else
futures.each_with_index.map do |future, i|
future._set(replies[i])
end
end
end
可以对比一下,与Pipeline不同,普通的Client因为只会一次write一条命令,所以也只会read一次
# lib/redis/client.rb
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given?
yield reply
else
reply
end
end
这样看来,多条命令,不管是pipeline还是不pipeline,都一样要write/read同样次数,那pipeline怎么会更加高效呢?
首先,从上面看到,write跟read不必是连续的,write只返回发送字数,不需等待redis处理。其次,一次read所产生的网络IO是有可能把多个命令的结果都返回的。如源码所示,一次gets,读取1024字节,如果没读完一个命令的结果(根据RESP,每个结果以\r\n结尾),则继续请求(_read_from_socket)。如果1024字节中已包含了多个结果,则下次read进入gets时,直接slice!取出下一段结果就可以了
# lib/redis/connection/ruby.rb
def gets
crlf = nil
while (crlf = @buffer.index(CRLF)) == nil
@buffer << _read_from_socket(1024)
end
@buffer.slice!(0, crlf + CRLF.bytesize)
end
def _read_from_socket(nbytes)
begin
read_nonblock(nbytes)
rescue #...
end
rescue EOFError
raise Errno::ECONNRESET
end
而例子中三个set,结果返回三个OK,是不足1024字节的。因此,调用栈有三次gets,但只有一次_read_from_socket。这样当然比非pipeline的write完马上read要高效
