chewy的同步策略
跟踪一下Chewy.strategy(:atomic){}的运作:
[6] pry(main)> binding.trace_tree(htmp: 'chewy_atomic', no_methods: /perform_request/){ Chewy.strategy(:atomic){ books.each{ |b| b.update(year: 1994) } } }
Chewy strategies stack: [2] <- atomic @ (pry):6
(36.1ms) begin transaction
SQL (57.9ms) UPDATE "books" SET "year" = ?, "updated_at" = ? WHERE "books"."id" = ? [["year", 1994], ["updated_at", "2019-12-29 13:43:05.099905"], ["id", 1]]
(43.2ms) commit transaction
(51.0ms) begin transaction
SQL (58.6ms) UPDATE "books" SET "year" = ?, "updated_at" = ? WHERE "books"."id" = ? [["year", 1994], ["updated_at", "2019-12-29 13:43:08.011823"], ["id", 2]]
(43.1ms) commit transaction
(48.5ms) SELECT COUNT(*) FROM "books" WHERE "books"."id" IN (1, 2)
Book Load (68.3ms) SELECT "books".* FROM "books" WHERE "books"."id" IN (1, 2)
Dude Load (75.2ms) SELECT "dudes".* FROM "dudes" WHERE "dudes"."id" IN (1, 2)
SQL (156.8ms) SELECT "taggings".*, "taggings"."id" AS t0_r0, "taggings"."tag_id" AS t0_r1, "taggings"."taggable_id" AS t0_r2, "taggings"."taggable_type" AS t0_r3, "taggings"."tagger_id" AS t0_r4, "taggings"."tagger_type" AS t0_r5, "taggings"."context" AS t0_r6, "taggings"."created_at" AS t0_r7, "tags"."id" AS t1_r0, "tags"."name" AS t1_r1, "tags"."taggings_count" AS t1_r2 FROM "taggings" LEFT OUTER JOIN "tags" ON "tags"."id" = "taggings"."tag_id" WHERE "taggings"."context" = ? AND "taggings"."taggable_type" = 'Book' AND "taggings"."taggable_id" IN (1, 2) [["context", "tags"]]
EntertainmentIndex::Book Import (15270.4ms) {:index=>2}
Chewy strategies stack: [2] -> atomic, now urgent @ (pry):6
[7] pry(main)>
得调用栈如下:
而从日志可见,ES的import是在所有commit之后才执行的(其实平时这两个update还应该用transaction包起来的)。这个动作位于Chewy::Strategy#pop之中

wrap中所作的push和pop实际上干的是,根据参数指定的名字找到具体策略类并实例化后将其压入栈中,出栈时调用策略对象的leave方法
# chewy-5.1.0/lib/chewy/strategy.rb
module Chewy
class Strategy
def initialize
@stack = [resolve(Chewy.root_strategy).new]
end
def current
@stack.last
end
def push(name)
result = @stack.push resolve(name).new
debug "[#{@stack.size - 1}] <- #{current.name}" if @stack.size > 2
result
end
def pop
raise "Can't pop root strategy" if @stack.one?
result = @stack.pop.tap(&:leave)
debug "[#{@stack.size}] -> #{result.name}, now #{current.name}" if @stack.size > 1
result
end
def wrap(name)
stack = push(name)
yield
ensure
pop if stack
end
end
end
而栈底固定的策略类是Urgent,源码如下
# chewy-5.1.0/lib/chewy/strategy/urgent.rb
module Chewy
class Strategy
class Urgent < Base
def update(type, objects, _options = {})
type.import!(Array.wrap(objects))
end
end
end
end
刚才使用的atomic则如下,其update方法只是将该更新的索引及文档id收集起来,到了leave被调用时才实际地查数据并推到ES去
# chewy-5.1.0/lib/chewy/strategy/atomic.rb
module Chewy
class Strategy
class Atomic < Base
def initialize
@stash = {}
end
def update(type, objects, _options = {})
@stash[type] ||= []
@stash[type] |= type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)
end
def leave
@stash.all? { |type, ids| type.import!(ids) }
end
end
end
end
该update方法正是在activerecord的after_commit中被回调(由model的update_index所定义),如下。如果没有使用Chewy.strategy(:atomic){}的话,则after_commit中查到的Chewy::Strategy#current将会是初始化在栈底的Urgent策略,导致在没有将多个不同类型的model用一个transaction包裹,发起多次ES的import。(不过在嵌套transaction(即require_new)的情况里,还是会每个commit一次import)

再检查Resque、Sidekiq、Shoryuken的代码,可见它们都是继承了Atomic,然后在leave中实现对应的队列操作,所以它们都是在Chewy.strategy的block的最后才执行批量更新ES。
而因为Chewy.strategy实质上是个栈,所以可以实现以下效果:如果希望某些数据更新要即时同步,可以通过新增一层Chewy.strategy(:urgent)来实现。不过暂时没发现什么时候会有这样的需求……
Chewy.strategy(:atomic) do
city1.do_update!
Chewy.strategy(:urgent) do
city2.do_update!
city3.do_update!
# there will be 2 update index requests for city2 and city3
end
city4..do_update!
# city1 and city4 will be grouped in one index update request
end
另外要注意的是,因为Strategy的update方法是在after_commit回调中调用的,所以transaction的block需要放在Chewy.strategy的block中,否则,在执行Chewy.strategy后,策略栈会退回到urgent,这时transaction结束前调的after_commit回调就是urgent的update,导致ES不是批量更新。