opentelemetry的batch_span_processor.rb
以下来自于opentelemetry-sdk-1.2.0的
OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
初始化
注入exporter,并配置参数(默认取环境变量)
exporter_timeout
:控制连接和发送远端的超时时间schedule_delay
:超过此值,即使未凑满max_export_batch_size
,也会发送max_queue_size
:如果spans
体积超过此值,则丢弃早期的spanmax_export_batch_size
:一批的最大体积,如果spans
累积超过此值,则触发发送start_thread_on_boot
:是否初始化时就启动异步发送span的线程,默认是
然后就会初始化
spans
(span暂存容器)、@condition
(通知发送)、@thread
异步发送线程。源码如下def initialize(exporter,
exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)),
schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)),
max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)),
start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i,
metrics_reporter: nil)
raise ArgumentError if max_export_batch_size > max_queue_size
raise ArgumentError, "exporter #{exporter.inspect} does not appear to be a valid exporter" unless Common::Utilities.valid_exporter?(exporter)
@exporter = exporter
@exporter_timeout_seconds = exporter_timeout / 1000.0
@mutex = Mutex.new
@export_mutex = Mutex.new
@condition = ConditionVariable.new
@keep_running = true
@delay_seconds = schedule_delay / 1000.0
@max_queue_size = max_queue_size
@batch_size = max_export_batch_size
@metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter
@spans = []
@pid = nil
@thread = nil
reset_on_fork(restart_thread: start_thread_on_boot)
end
收集
如果暂存的数量超过容量限制,则入队前先清掉早期的span
如果暂存的数量超过一批的大小限制,则通知异步线程发送
def on_finish(span)
return unless span.context.trace_flags.sampled?
lock do
reset_on_fork
n = spans.size + 1 - max_queue_size
if n.positive?
spans.shift(n)
report_dropped_spans(n, reason: 'buffer-full')
end
spans << span
@condition.signal if spans.size > batch_size
end
end
发送
为免运行过程中fork了,需要经常检查pid。如果pid不同了,就要新建异步线程,并清空
spans
以免读取父进程的spandef reset_on_fork(restart_thread: true)
pid = Process.pid
return if @pid == pid
@pid = pid
spans.clear
@thread = restart_thread ? Thread.new { work } : nil
rescue ThreadError => e
@metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => 'ThreadError' })
OpenTelemetry.handle_error(exception: e, message: 'unexpected error in BatchSpanProcessor#reset_on_fork')
end
轮询
spans
,如果批次未满,则等一段时间。如果超时了或者批次满了,则发送def work
loop do
batch = lock do
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running
fetch_batch
end
@metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f)
export_batch(batch)
end
end