根据官方文档,如果想手工控制commit时机,需要在轮询时将automatically_mark_as_processed选项传false,然后自行调用mark_message_as_processed和commit_offsets。

看轮询源码如下:

# ruby-kafka-0.7.10/lib/kafka/consumer.rb
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
  # ...

  consumer_loop do
    batches = fetch_batches

    batches.each do |batch|
      batch.messages.each do |message|
        # ...

        begin
          yield message unless message.is_control_record
          @current_offsets[message.topic][message.partition] = message.offset
        rescue => e
          location = "#{message.topic}/#{message.partition} at offset #{message.offset}"
          backtrace = e.backtrace.join("\n")
          @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"

          raise ProcessingError.new(message.topic, message.partition, message.offset)
        end

        mark_message_as_processed(message) if automatically_mark_as_processed
        @offset_manager.commit_offsets_if_necessary

        # ...
      end
    end

    # We may not have received any messages, but it's still a good idea to
    # commit offsets if we've processed messages in the last set of batches.
    # This also ensures the offsets are retained if we haven't read any messages
    # since the offset retention period has elapsed.
    @offset_manager.commit_offsets_if_necessary
  end
end

def mark_message_as_processed(message)
  @offset_manager.mark_as_processed(message.topic, message.partition, message.offset)
end

在这里,automatically_mark_as_processed所控制的,其实是“是否将当前消息的offset更新到@offset_manager的缓存里”。offset_manager内部是存有每个topic每个partition的偏移量的:

# ruby-kafka-0.7.10/lib/kafka/offset_manager.rb
def mark_as_processed(topic, partition, offset)
  @uncommitted_offsets += 1
  @processed_offsets[topic] ||= {}

  # The committed offset should always be the offset of the next message that the
  # application will read, thus adding one to the last message processed.
  @processed_offsets[topic][partition] = offset + 1
  @logger.debug "Marking #{topic}/#{partition}:#{offset} as processed"
end

除了设置automatically_mark_as_processed选项,实际上,生成consumer时还需要设置commit_interval和commit_threshold为零,因为轮询中每取一条消息都会检查是否太长时间未commit或太多消息未commit:

# ruby-kafka-0.7.10/lib/kafka/offset_manager.rb
def commit_offsets_if_necessary
  recommit = recommit_timeout_reached?
  if recommit || commit_timeout_reached? || commit_threshold_reached?
    commit_offsets(recommit)
  end
end

def commit_timeout_reached?
  @commit_interval != 0 && seconds_since_last_commit >= @commit_interval
end

def commit_threshold_reached?
  @commit_threshold != 0 && @uncommitted_offsets >= @commit_threshold
end

而consumer_loop的末尾之所以还有一个commit_offsets_if_necessary,是因为consumer_loop在没有获取到新消息时,也不是完全阻塞的。如下,当fetcher没数据时,它只是睡眠一下然后返回一个空数组

# ruby-kafka-0.7.10/lib/kafka/consumer.rb
def fetch_batches
  # ...

  if !@fetcher.data?
    @logger.debug "No batches to process"
    sleep 2
    []
  else
    tag, message = @fetcher.poll

    case tag
    when :batches
      # make sure any old batches, fetched prior to the completion of a consumer group sync,
      # are only processed if the batches are from brokers for which this broker is still responsible.
      message.select { |batch| @group.assigned_to?(batch.topic, batch.partition) }
    when :exception
      raise message
    end
  end

  # ...
end

fetcher只是一个ruby Queue,真正的读取数据,是在consumer轮询时,它新建的一个线程,从kafka获取数据去填充这个Queue。另外,从step方法也可见,因为offset的提交时机是不定的,所以fetcher必须自己也缓存一份offset,否则会可能拉取到重复数据

# ruby-kafka-0.7.10/lib/kafka/fetcher.rb
def start
  # ...
  @thread = Thread.new do
    while @running
      loop
    end
    @logger.info "#{@group} Fetcher thread exited."
  end
  # ...
end

def loop
  # ...
  step
  # ..
end

def step
  batches = fetch_batches

  batches.each do |batch|
    # ...
    @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 unless batch.unknown_last_offset?
  end

  @queue << [:batches, batches, current_reset_counter]
  # ...
end

def poll
  tag, message, reset_counter = @queue.deq
  # ...
  return [tag, message]
end