为研究zipkin以及zipkin-trace这个gem,对 https://github.com/openzipkin/zipkin-ruby-example 的代码稍作修改,并复制成4个不同端口的服务(8081、9000、9001、9002),使其调用链如下

8081 frontend
|_ 9000 backend
|   |_ 9001 deep_backend
\_ 9002 another_backend

并对zipkin-tracer这个gem的代码作如下改动,只跟踪zipkin-tracer本层的rack运动,排除底层:

# zipkin-tracer-0.40.0/lib/zipkin-tracer/rack/zipkin-tracer.rb
module ZipkinTracer
  class RackHandler
    def call(env)
      port = env['SERVER_PORT']
      binding.trace_tree(htmp: "zip-#{port}", no_methods: /pass_to_under/, warm: port) do
        _call(env)
      end
    end

    def _call(env)
      zipkin_env = ZipkinEnv.new(env, @config)
      trace_id = zipkin_env.trace_id
      TraceContainer.with_trace_id(trace_id) do
        if !trace_id.sampled?
          pass_to_under(env)
        else
          @tracer.with_new_span(trace_id, span_name(env)) do |span|
            trace!(span, zipkin_env) { pass_to_under(env) }
          end
        end
      end
    end

    def pass_to_under(env)
      @app.call(env)
    end
  end
end

然后启动zipkin的docker和这4个服务,对8081进行访问,使zipkin记录整个链路如下



并获得4份zipkin-tracer的调用轨迹:

20191020_153300_187_zip-8081.html

20191020_153301_025_zip-9000.html

20191020_153301_856_zip-9001.html

20191020_153305_638_zip-9002.html

对照zipkin的图和源码的调用栈,可见顶层调用方8081从env生成的trace_id对象如下,其span_id被当做了整个调用链的trace_id,其parent_id是没有的



而最深的被调用方9001的trace_id对象中,parent_id为9000的span_id



这些xxx_id的生成逻辑如下源码所示,整个链路的trace_id、调用方的parent_span_id、调用方分配给被调用方的span_id,都可从header里取出。如果header没带有这些键,则代表这是链路的开头,需要生成

# zipkin-tracer-0.40.0/lib/zipkin-tracer/rack/zipkin_env.rb
B3_SINGLE_HEADER = 'HTTP_B3'.freeze
B3_REQUIRED_HEADERS = %w[HTTP_X_B3_TRACEID HTTP_X_B3_SPANID].freeze
B3_OPT_HEADERS = %w[HTTP_X_B3_PARENTSPANID HTTP_X_B3_SAMPLED HTTP_X_B3_FLAGS].freeze

def retrieve_or_generate_ids
  if called_with_zipkin_b3_single_header?
    trace_id, span_id, parent_span_id, sampled, flags =
      B3SingleHeaderFormat.parse_from_header(@env[B3_SINGLE_HEADER])
    shared = true
  elsif called_with_zipkin_headers?
    trace_id, span_id, parent_span_id, sampled, flags = @env.values_at(*B3_REQUIRED_HEADERS, *B3_OPT_HEADERS)
    shared = true
  end

  unless trace_id
    span_id = TraceGenerator.new.generate_id
    trace_id = TraceGenerator.new.generate_id_from_span_id(span_id)
    parent_span_id = nil
    shared = false
  end

  [trace_id, span_id, parent_span_id, sampled, flags, shared]
end

然后就是发送span到zipkin服务器的过程,调用栈如下



源码如下,实际对于底层app的call,被包裹成block由with_new_span来调用,并在调用前后记录trace_id和调用时长,调用后还要flush!到接收zipkin数据的端点,而端点都是继承ZipkinSenderBase然后重写flush!方法的

# zipkin-tracer-0.40.0/lib/zipkin-tracer/zipkin_sender_base.rb
module Trace
  class ZipkinSenderBase

    def initialize(options={})
      @options = options
      reset
    end

    def with_new_span(trace_id, name)
      span = start_span(trace_id, name)
      result = yield span
      end_span(span)
      result
    end

    def end_span(span, timestamp = Time.now)
      span.close(timestamp)
      # If in a thread not handling incoming http requests, it will not have Kind::SERVER, so the span
      # will never be flushed and will cause memory leak.
      # If no parent span, then current span needs to flush when it ends.
      if !span.has_parent_span? || span.kind == Trace::Span::Kind::SERVER
        flush!
        reset
      end
    end

    def start_span(trace_id, name, timestamp = Time.now)
      span = Span.new(name, trace_id, timestamp)
      span.local_endpoint = Trace.default_endpoint
      store_span(trace_id, span)
      span
    end

    def flush!
      raise "not implemented"
    end
  end
end

而被调用方的span_id则是由调用方生成的,从FaradayHandler代码就可知,在发起调用前,会生成新的span_id塞入env中,之后才真正的去请求

# zipkin-tracer-0.40.0/lib/zipkin-tracer/faraday/zipkin-tracer.rb
module ZipkinTracer
  class FaradayHandler < ::Faraday::Middleware

    def call(env)
      trace_id = TraceGenerator.new.next_trace_id
      TraceContainer.with_trace_id(trace_id) do
        b3_headers.each do |method, header|
          env[:request_headers][header] = trace_id.send(method).to_s
        end
        if Trace.tracer && trace_id.sampled?
          trace!(env, trace_id)
        else
          @app.call(env)
        end
      end
    end

    def b3_headers
      {
        trace_id: 'X-B3-TraceId',
        parent_id: 'X-B3-ParentSpanId',
        span_id: 'X-B3-SpanId',
        sampled: 'X-B3-Sampled',
        flags: 'X-B3-Flags'
      }
    end

    def trace!(env, trace_id)
      response = nil
      # handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
      method = env[:method].to_s
      url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
      remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name) # The endpoint we are calling.
      Trace.tracer.with_new_span(trace_id, method.downcase) do |span|
        @span = span # So we can record on exceptions
        # annotate with method (GET/POST/etc.) and uri path
        span.kind = Trace::Span::Kind::CLIENT
        span.remote_endpoint = remote_endpoint
        span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
        span.record_tag(Trace::Span::Tag::PATH, url.path)
        response = @app.call(env).on_complete do |renv|
          span.record_status(renv[:status])
        end
      end
      response
    rescue Net::ReadTimeout
      record_error(@span, 'Request timed out.')
      raise
    rescue Faraday::ConnectionFailed
      record_error(@span, 'Request connection failed.')
      raise
    rescue Faraday::ClientError
      record_error(@span, 'Generic Faraday client error.')
      raise
    end
  end
end