ruby-kafka的自动与手动提交偏移量
根据官方文档,如果想手工控制commit时机,需要在轮询时将automatically_mark_as_processed选项传false,然后自行调用mark_message_as_processed和commit_offsets。
看轮询源码如下:
# ruby-kafka-0.7.10/lib/kafka/consumer.rb
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
# ...
consumer_loop do
batches = fetch_batches
batches.each do |batch|
batch.messages.each do |message|
# ...
begin
yield message unless message.is_control_record
@current_offsets[message.topic][message.partition] = message.offset
rescue => e
location = "#{message.topic}/#{message.partition} at offset #{message.offset}"
backtrace = e.backtrace.join("\n")
@logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"
raise ProcessingError.new(message.topic, message.partition, message.offset)
end
mark_message_as_processed(message) if automatically_mark_as_processed
@offset_manager.commit_offsets_if_necessary
# ...
end
end
# We may not have received any messages, but it's still a good idea to
# commit offsets if we've processed messages in the last set of batches.
# This also ensures the offsets are retained if we haven't read any messages
# since the offset retention period has elapsed.
@offset_manager.commit_offsets_if_necessary
end
end
def mark_message_as_processed(message)
@offset_manager.mark_as_processed(message.topic, message.partition, message.offset)
end
在这里,automatically_mark_as_processed所控制的,其实是“是否将当前消息的offset更新到@offset_manager的缓存里”。offset_manager内部是存有每个topic每个partition的偏移量的:
# ruby-kafka-0.7.10/lib/kafka/offset_manager.rb
def mark_as_processed(topic, partition, offset)
@uncommitted_offsets += 1
@processed_offsets[topic] ||= {}
# The committed offset should always be the offset of the next message that the
# application will read, thus adding one to the last message processed.
@processed_offsets[topic][partition] = offset + 1
@logger.debug "Marking #{topic}/#{partition}:#{offset} as processed"
end
除了设置automatically_mark_as_processed选项,实际上,生成consumer时还需要设置commit_interval和commit_threshold为零,因为轮询中每取一条消息都会检查是否太长时间未commit或太多消息未commit:
# ruby-kafka-0.7.10/lib/kafka/offset_manager.rb
def commit_offsets_if_necessary
recommit = recommit_timeout_reached?
if recommit || commit_timeout_reached? || commit_threshold_reached?
commit_offsets(recommit)
end
end
def commit_timeout_reached?
@commit_interval != 0 && seconds_since_last_commit >= @commit_interval
end
def commit_threshold_reached?
@commit_threshold != 0 && @uncommitted_offsets >= @commit_threshold
end
而consumer_loop的末尾之所以还有一个commit_offsets_if_necessary,是因为consumer_loop在没有获取到新消息时,也不是完全阻塞的。如下,当fetcher没数据时,它只是睡眠一下然后返回一个空数组
# ruby-kafka-0.7.10/lib/kafka/consumer.rb
def fetch_batches
# ...
if !@fetcher.data?
@logger.debug "No batches to process"
sleep 2
[]
else
tag, message = @fetcher.poll
case tag
when :batches
# make sure any old batches, fetched prior to the completion of a consumer group sync,
# are only processed if the batches are from brokers for which this broker is still responsible.
message.select { |batch| @group.assigned_to?(batch.topic, batch.partition) }
when :exception
raise message
end
end
# ...
end
fetcher只是一个ruby Queue,真正的读取数据,是在consumer轮询时,它新建的一个线程,从kafka获取数据去填充这个Queue。另外,从step方法也可见,因为offset的提交时机是不定的,所以fetcher必须自己也缓存一份offset,否则会可能拉取到重复数据
# ruby-kafka-0.7.10/lib/kafka/fetcher.rb
def start
# ...
@thread = Thread.new do
while @running
loop
end
@logger.info "#{@group} Fetcher thread exited."
end
# ...
end
def loop
# ...
step
# ..
end
def step
batches = fetch_batches
batches.each do |batch|
# ...
@next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 unless batch.unknown_last_offset?
end
@queue << [:batches, batches, current_reset_counter]
# ...
end
def poll
tag, message, reset_counter = @queue.deq
# ...
return [tag, message]
end