zipkin-tracer的运作
为研究zipkin以及zipkin-trace这个gem,对 https://github.com/openzipkin/zipkin-ruby-example 的代码稍作修改,并复制成4个不同端口的服务(8081、9000、9001、9002),使其调用链如下
8081 frontend
|_ 9000 backend
| |_ 9001 deep_backend
\_ 9002 another_backend
并对zipkin-tracer这个gem的代码作如下改动,只跟踪zipkin-tracer本层的rack运动,排除底层:
# zipkin-tracer-0.40.0/lib/zipkin-tracer/rack/zipkin-tracer.rb
module ZipkinTracer
class RackHandler
def call(env)
port = env['SERVER_PORT']
binding.trace_tree(htmp: "zip-#{port}", no_methods: /pass_to_under/, warm: port) do
_call(env)
end
end
def _call(env)
zipkin_env = ZipkinEnv.new(env, @config)
trace_id = zipkin_env.trace_id
TraceContainer.with_trace_id(trace_id) do
if !trace_id.sampled?
pass_to_under(env)
else
@tracer.with_new_span(trace_id, span_name(env)) do |span|
trace!(span, zipkin_env) { pass_to_under(env) }
end
end
end
end
def pass_to_under(env)
@app.call(env)
end
end
end
然后启动zipkin的docker和这4个服务,对8081进行访问,使zipkin记录整个链路如下

并获得4份zipkin-tracer的调用轨迹:
对照zipkin的图和源码的调用栈,可见顶层调用方8081从env生成的trace_id对象如下,其span_id被当做了整个调用链的trace_id,其parent_id是没有的

而最深的被调用方9001的trace_id对象中,parent_id为9000的span_id

这些xxx_id的生成逻辑如下源码所示,整个链路的trace_id、调用方的parent_span_id、调用方分配给被调用方的span_id,都可从header里取出。如果header没带有这些键,则代表这是链路的开头,需要生成
# zipkin-tracer-0.40.0/lib/zipkin-tracer/rack/zipkin_env.rb
B3_SINGLE_HEADER = 'HTTP_B3'.freeze
B3_REQUIRED_HEADERS = %w[HTTP_X_B3_TRACEID HTTP_X_B3_SPANID].freeze
B3_OPT_HEADERS = %w[HTTP_X_B3_PARENTSPANID HTTP_X_B3_SAMPLED HTTP_X_B3_FLAGS].freeze
def retrieve_or_generate_ids
if called_with_zipkin_b3_single_header?
trace_id, span_id, parent_span_id, sampled, flags =
B3SingleHeaderFormat.parse_from_header(@env[B3_SINGLE_HEADER])
shared = true
elsif called_with_zipkin_headers?
trace_id, span_id, parent_span_id, sampled, flags = @env.values_at(*B3_REQUIRED_HEADERS, *B3_OPT_HEADERS)
shared = true
end
unless trace_id
span_id = TraceGenerator.new.generate_id
trace_id = TraceGenerator.new.generate_id_from_span_id(span_id)
parent_span_id = nil
shared = false
end
[trace_id, span_id, parent_span_id, sampled, flags, shared]
end
然后就是发送span到zipkin服务器的过程,调用栈如下

源码如下,实际对于底层app的call,被包裹成block由with_new_span来调用,并在调用前后记录trace_id和调用时长,调用后还要flush!到接收zipkin数据的端点,而端点都是继承ZipkinSenderBase然后重写flush!方法的
# zipkin-tracer-0.40.0/lib/zipkin-tracer/zipkin_sender_base.rb
module Trace
class ZipkinSenderBase
def initialize(options={})
@options = options
reset
end
def with_new_span(trace_id, name)
span = start_span(trace_id, name)
result = yield span
end_span(span)
result
end
def end_span(span, timestamp = Time.now)
span.close(timestamp)
# If in a thread not handling incoming http requests, it will not have Kind::SERVER, so the span
# will never be flushed and will cause memory leak.
# If no parent span, then current span needs to flush when it ends.
if !span.has_parent_span? || span.kind == Trace::Span::Kind::SERVER
flush!
reset
end
end
def start_span(trace_id, name, timestamp = Time.now)
span = Span.new(name, trace_id, timestamp)
span.local_endpoint = Trace.default_endpoint
store_span(trace_id, span)
span
end
def flush!
raise "not implemented"
end
end
end
而被调用方的span_id则是由调用方生成的,从FaradayHandler代码就可知,在发起调用前,会生成新的span_id塞入env中,之后才真正的去请求
# zipkin-tracer-0.40.0/lib/zipkin-tracer/faraday/zipkin-tracer.rb
module ZipkinTracer
class FaradayHandler < ::Faraday::Middleware
def call(env)
trace_id = TraceGenerator.new.next_trace_id
TraceContainer.with_trace_id(trace_id) do
b3_headers.each do |method, header|
env[:request_headers][header] = trace_id.send(method).to_s
end
if Trace.tracer && trace_id.sampled?
trace!(env, trace_id)
else
@app.call(env)
end
end
end
def b3_headers
{
trace_id: 'X-B3-TraceId',
parent_id: 'X-B3-ParentSpanId',
span_id: 'X-B3-SpanId',
sampled: 'X-B3-Sampled',
flags: 'X-B3-Flags'
}
end
def trace!(env, trace_id)
response = nil
# handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
method = env[:method].to_s
url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name) # The endpoint we are calling.
Trace.tracer.with_new_span(trace_id, method.downcase) do |span|
@span = span # So we can record on exceptions
# annotate with method (GET/POST/etc.) and uri path
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)
response = @app.call(env).on_complete do |renv|
span.record_status(renv[:status])
end
end
response
rescue Net::ReadTimeout
record_error(@span, 'Request timed out.')
raise
rescue Faraday::ConnectionFailed
record_error(@span, 'Request connection failed.')
raise
rescue Faraday::ClientError
record_error(@span, 'Generic Faraday client error.')
raise
end
end
end