以下来自于opentelemetry-sdk-1.2.0的OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor

初始化

注入exporter,并配置参数(默认取环境变量)

  • exporter_timeout:控制连接和发送远端的超时时间
  • schedule_delay:超过此值,即使未凑满max_export_batch_size,也会发送
  • max_queue_size:如果spans体积超过此值,则丢弃早期的span
  • max_export_batch_size:一批的最大体积,如果spans累积超过此值,则触发发送
  • start_thread_on_boot:是否初始化时就启动异步发送span的线程,默认是
然后就会初始化spans(span暂存容器)、@condition(通知发送)、@thread异步发送线程。源码如下

def initialize(exporter,
               exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)),
               schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)),
               max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
               max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)),
               start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i,
               metrics_reporter: nil)
  raise ArgumentError if max_export_batch_size > max_queue_size
  raise ArgumentError, "exporter #{exporter.inspect} does not appear to be a valid exporter" unless Common::Utilities.valid_exporter?(exporter)

  @exporter = exporter
  @exporter_timeout_seconds = exporter_timeout / 1000.0
  @mutex = Mutex.new
  @export_mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay / 1000.0
  @max_queue_size = max_queue_size
  @batch_size = max_export_batch_size
  @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter
  @spans = []
  @pid = nil
  @thread = nil
  reset_on_fork(restart_thread: start_thread_on_boot)
end

收集

如果暂存的数量超过容量限制,则入队前先清掉早期的span

如果暂存的数量超过一批的大小限制,则通知异步线程发送

def on_finish(span)
  return unless span.context.trace_flags.sampled?

  lock do
    reset_on_fork
    n = spans.size + 1 - max_queue_size
    if n.positive?
      spans.shift(n)
      report_dropped_spans(n, reason: 'buffer-full')
    end
    spans << span
    @condition.signal if spans.size > batch_size
  end
end

发送

为免运行过程中fork了,需要经常检查pid。如果pid不同了,就要新建异步线程,并清空spans以免读取父进程的span

def reset_on_fork(restart_thread: true)
  pid = Process.pid
  return if @pid == pid

  @pid = pid
  spans.clear
  @thread = restart_thread ? Thread.new { work } : nil
rescue ThreadError => e
  @metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => 'ThreadError' })
  OpenTelemetry.handle_error(exception: e, message: 'unexpected error in BatchSpanProcessor#reset_on_fork')
end

轮询spans,如果批次未满,则等一段时间。如果超时了或者批次满了,则发送

def work
  loop do
    batch = lock do
      @condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
      @condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
      return unless @keep_running

      fetch_batch
    end

    @metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f)

    export_batch(batch)
  end
end