跟踪一下Chewy.strategy(:atomic){}的运作:

[6] pry(main)> binding.trace_tree(htmp: 'chewy_atomic', no_methods: /perform_request/){ Chewy.strategy(:atomic){ books.each{ |b| b.update(year: 1994) } } }
Chewy strategies stack: [2] <- atomic @ (pry):6
   (36.1ms)  begin transaction
  SQL (57.9ms)  UPDATE "books" SET "year" = ?, "updated_at" = ? WHERE "books"."id" = ?  [["year", 1994], ["updated_at", "2019-12-29 13:43:05.099905"], ["id", 1]]
   (43.2ms)  commit transaction
   (51.0ms)  begin transaction
  SQL (58.6ms)  UPDATE "books" SET "year" = ?, "updated_at" = ? WHERE "books"."id" = ?  [["year", 1994], ["updated_at", "2019-12-29 13:43:08.011823"], ["id", 2]]
   (43.1ms)  commit transaction
   (48.5ms)  SELECT COUNT(*) FROM "books" WHERE "books"."id" IN (1, 2)
  Book Load (68.3ms)  SELECT "books".* FROM "books" WHERE "books"."id" IN (1, 2)
  Dude Load (75.2ms)  SELECT "dudes".* FROM "dudes" WHERE "dudes"."id" IN (1, 2)
  SQL (156.8ms)  SELECT "taggings".*, "taggings"."id" AS t0_r0, "taggings"."tag_id" AS t0_r1, "taggings"."taggable_id" AS t0_r2, "taggings"."taggable_type" AS t0_r3, "taggings"."tagger_id" AS t0_r4, "taggings"."tagger_type" AS t0_r5, "taggings"."context" AS t0_r6, "taggings"."created_at" AS t0_r7, "tags"."id" AS t1_r0, "tags"."name" AS t1_r1, "tags"."taggings_count" AS t1_r2 FROM "taggings" LEFT OUTER JOIN "tags" ON "tags"."id" = "taggings"."tag_id" WHERE "taggings"."context" = ? AND "taggings"."taggable_type" = 'Book' AND "taggings"."taggable_id" IN (1, 2)  [["context", "tags"]]
  EntertainmentIndex::Book Import (15270.4ms) {:index=>2}
Chewy strategies stack: [2] -> atomic, now urgent @ (pry):6
[7] pry(main)>

得调用栈如下:

20191229_134304_402_chewy_atomic.html

而从日志可见,ES的import是在所有commit之后才执行的(其实平时这两个update还应该用transaction包起来的)。这个动作位于Chewy::Strategy#pop之中


wrap中所作的push和pop实际上干的是,根据参数指定的名字找到具体策略类并实例化后将其压入栈中,出栈时调用策略对象的leave方法

# chewy-5.1.0/lib/chewy/strategy.rb
module Chewy
  class Strategy
    def initialize
      @stack = [resolve(Chewy.root_strategy).new]
    end

    def current
      @stack.last
    end

    def push(name)
      result = @stack.push resolve(name).new
      debug "[#{@stack.size - 1}] <- #{current.name}" if @stack.size > 2
      result
    end

    def pop
      raise "Can't pop root strategy" if @stack.one?
      result = @stack.pop.tap(&:leave)
      debug "[#{@stack.size}] -> #{result.name}, now #{current.name}" if @stack.size > 1
      result
    end

    def wrap(name)
      stack = push(name)
      yield
    ensure
      pop if stack
    end
  end
end

而栈底固定的策略类是Urgent,源码如下

# chewy-5.1.0/lib/chewy/strategy/urgent.rb
module Chewy
  class Strategy
    class Urgent < Base
      def update(type, objects, _options = {})
        type.import!(Array.wrap(objects))
      end
    end
  end
end

刚才使用的atomic则如下,其update方法只是将该更新的索引及文档id收集起来,到了leave被调用时才实际地查数据并推到ES去

# chewy-5.1.0/lib/chewy/strategy/atomic.rb
module Chewy
  class Strategy
    class Atomic < Base
      def initialize
        @stash = {}
      end

      def update(type, objects, _options = {})
        @stash[type] ||= []
        @stash[type] |= type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)
      end

      def leave
        @stash.all? { |type, ids| type.import!(ids) }
      end
    end
  end
end

该update方法正是在activerecord的after_commit中被回调(由model的update_index所定义),如下。如果没有使用Chewy.strategy(:atomic){}的话,则after_commit中查到的Chewy::Strategy#current将会是初始化在栈底的Urgent策略,导致在没有将多个不同类型的model用一个transaction包裹,发起多次ES的import。(不过在嵌套transaction(即require_new)的情况里,还是会每个commit一次import)


再检查Resque、Sidekiq、Shoryuken的代码,可见它们都是继承了Atomic,然后在leave中实现对应的队列操作,所以它们都是在Chewy.strategy的block的最后才执行批量更新ES。

而因为Chewy.strategy实质上是个栈,所以可以实现以下效果:如果希望某些数据更新要即时同步,可以通过新增一层Chewy.strategy(:urgent)来实现。不过暂时没发现什么时候会有这样的需求……

Chewy.strategy(:atomic) do
  city1.do_update!
  Chewy.strategy(:urgent) do
    city2.do_update!
    city3.do_update!
    # there will be 2 update index requests for city2 and city3
  end
  city4..do_update!
  # city1 and city4 will be grouped in one index update request
end

另外要注意的是,因为Strategy的update方法是在after_commit回调中调用的,所以transaction的block需要放在Chewy.strategy的block中,否则,在执行Chewy.strategy后,策略栈会退回到urgent,这时transaction结束前调的after_commit回调就是urgent的update,导致ES不是批量更新。