使用 Jaeger 完成服务间的链路追踪

世上本没有路,走的人多了,便变成了路 -- 鲁迅

本次讨论的话题就是需要在各个服务之间踏出条"路",让 bug 有"路"可循。

至于为什么用 jaeger... 这个支持多语言方案算么?遵循 opentracing 规范算么?开箱即用算么?还有更多? 至于为什么遵循 opentracing 规范的好... 这个...杠精同学,文末地址可参考 🙃🙃🙃

反正先撸袖开干...

了解 Jaeger

Jaeger: open source, end-to-end distributed tracing
Jaeger: 开源的、分布式系统的端到端追踪
Monitor and troubleshoot transactions in complex distributed systems
在复杂的分布式系统之间做监控及问题排查的事务处理。

jaeger 体系和流程如下图


来看看各层信息:

jaeger-client (OpenTracing API 各语言的实现,用于在应用中塞入信息采集点)

jaeger-agent (负责发送的进程,对 spans 进行处理并发送给 collector,监听 spans 的 UDP 发送。设计这层是为了作为基础组件部署到主机上,从 client 中抽象出了 collector 的发现和路由。注意:1.这层应该是部署在应用本地;2.如果配置报告的 endpoint,则直接将 spans 发送到 collector,不需要 agent。)

jaeger-collector (收集追踪 spans,并通过管道对追踪数据进行处理。当前的管道支持追踪的验证、索引、转换,最后存储数据)

data store (追踪信息的存储)

jaeger-query (从存储中检索追踪信息并通过 UI 展示)

jaeger-ui (UI 展示层,基于 React)

jaeger 的存储是可插拔组件,目前支持 Cassandra、ElasticSearch 和 Kafka。

基于以上的体系结构,本文关注点在 jaeger-client 部分,怎么实现服务之间和服务内部的 tracing。

了解追踪信息

Span:追踪中的逻辑单元,比如一次请求的过程/一个函数的执行,包含操作名称、开始时间、持续时间。

SpanContext:表示需要传播到下游 Spans 和跨应用/进程的 Span 数据,可以简单理解为串在各个系统里的统一标识对象。

Baggage:字符串组成的键值对,和 Span/SpanContext 互相关联,会在所有的下游 Spans 中进行传播。(可以做一些强大的功能,如在整个链路夹带数据,使用成本高,小心使用)

Tracer:项目中的追踪实例,追踪项目里数据变化/函数执行的过程,可以认为是一个定向非循环的 spans 的集合图。

Tracer 和 Span 如下图:


对于 jaeger-ui 展示效果如下图:


jaeger-client 是 opentracing 的实现,于是 jaeger-client-api 几乎等同于 opentracing api。

Api 和配置参考

本文以 Nodejs 为主,Go 为辅(因为当前刚好涉及到这两种服务的链路追踪方案 😂😂😂)。这里大致介绍一下 Configuration/Tracer/Span ,以便实现一个基础的 tracing 。

至于为什么要整这些基础玩意... 因为没详细文档!!!只能先整出个简版,能够提供实现一个完整的tracing。

Configuration 如下:

{
  serviceName: "string",
  disable: "boolean",
  sampler: {
    type: "string", // required
    param: "number", // required
    hostPort: "string",
    host: "string",
    port: "number",
    refreshIntervalMs: "number"
  },
  reporter: {
    logSpans: "boolean",
    agentHost: "string",
    agentPort: "number",
    collectorEndpoint: "string",
    username: "string",
    password: "string",
    flushIntervalMs: "number"
  },
  throttler: {
    host: "string",
    port: "number",
    refreshIntervalMs: "number"
  }
}

Tracer 对象如下:

{
  objects: {
    _tags: "object", // tags 信息,含 jaeger-version/hostname/ip/client-uuid
    _metrics: "object", // Metrics 度量实例
    _serviceName: "string", // 服务名称
    _reporter: "object", // 提交实例
    _sampler: "object", // 采样器实例
    _logger: "object", // 日志实例,默认 NullLogger
    _baggageSetter: "object", // BaggageSetter 实例
    _debugThrottler: "object", // DefaultThrottler 配置实例
    _injectors: "object", // 注入器列表
    _extractors: "object", // 提取器列表
    _process: "object" // process 信息,含 serviceName/tags
  },
  // 文件位置 ./jaeger-client-node/blob/master/src/tracer.js
  methods: {
    _startInternalSpan: "void", // 创建基础 span ,供 startSpan 方法调用 / params: spanContext(SpanContext) operationName(string) startTime(number) userTags(any) internalTags(any) parentContext?(SpanContext) rpcServer(boolean) references(Array<Reference>) / retuen Span
    _report: "void", // 发起数据提交,提交到jaeger后端 / params: span(Span)
    registerInjector: "void", // 向 tracer 注入 "注入 SpanContext 内容的方式" / params: format(string) injector(Injector)
    registerExtractor: "void", // 向 tracer 注入 "提取 SpanContext 内容的方式" / params: format(string) extractor(Extractor)
    startSpan: "void", // 创建一个 Span / params: operationName(string) options?:{ operationName(string) childOf(SpanContext) references(Array<Reference>) tags(object) startTime(number) }
    inject: "void", // 将 SpanContext 注入到序列化格式的 carrier 中 / params: SpanContext(SpanContext) format(string) carrier(any)
    extract: "void", // 从序列化格式的 carrier 中提取 SpanContext / params: format(string) carrier(any) / return SpanContext
    close: "void", // 关闭 tracer,更新 spans,或执行回调函数 / params: callback
    now: "void", // 返回当前时间
    _isDebugAllowed: "void" // 返回是否允许 debug
  }
}

Span 对象如下:

{
  objects: {
    _tracer: "object", // <Tracer>
    _operationName: "string", // span 名称
    _spanContext: "object", // span 数据,_traceId/_spanId/_parentId/...
    _startTime: "number", // 时间戳
    _logger: "object", // 日志实例,默认 NullLogger
    _references: "object", // 引用列表
    _baggageSetter: "object", // BaggageSetter 实例
    _logs: "object", // span 的 logs 列表
    _tags: "object", // span 的 tags 列表
    _duration: "number" // 耗时
  },
  // 文件位置  ./jaeger-client-node/blob/master/src/span.js
  methods: {
    _normalizeBaggageKey: "void", // 返回一个规范化的key / params: key(string) / 返回标准化的 key,字母小写化、使用破折号替换下划线 
    setBaggageItem: "void", // 使用关联的 key 设置 baggage 值 / params: key(string) value(any) / 返回当前Span
    getBaggageItem: "void", // 使用关联的 key 获取 baggage 值 / params: key(string) value(any) / 返回 baggage 值
    context: "void", // 获取当前 Span 的 SpanContext
    tracer: "void", // 获取当前 Span 的 Tracer
    _isWriteable: "void", // 返回当前 Span 是否可写
    setOperationName: "void", // 给当前 Span 设置操作名称 / params: operationName(string) / 返回当前 Span
    finish: "void", // 完成当前 Span / params: finishTime?(number)
    addTags: "void", // 向 Span 添加多个 tag / params: keyValuePairs(object) / 返回当前 Span
    setTag: "void", // 向 Span 添加单个 tag / params: key(string) value(any) / 返回当前 Span
    log: "void", // 向 Span 添加日志事件或者负载 / params: keyValuePairs(object) timestamp?(number) / 返回当前 Span
    logEvent: "void", // 携带负载以记录事件 / params: keyValuePairs(object) timestamp?(number) / 返回当前 Span
    _setSamplingPriority: "void" // 如果标志已成功更新,则返回true,否则返回false / params: priority(number) (0 禁用采样;1 启用采样)
  }
}

span 的话,会有 span 和 errorSpan 之分,在 jaeger-ui 代码里的判断是:

const isErrorTag = ({ key, value }: KeyValuePair) =>
  key === "error" && (value === true || value === "true");

所以,设置 errorSpan 的话代码如下:

span.setTag("error", true);

span.log({
  message: err.message
});

span.finish();

对于数据方面 jaeger 是比较自由的,可以拉 jaeger-ui 代码然后根据自己设置的 KeyValuePair 做个性化设置。

实践/案例

实现思路都是相同的,在发起服务交流之前对该操作做一层拦截,向交流的信息内注入追踪标识,用于在追踪信息收集后的归档以串成一条调用链路。

Nodejs 服务之间的追踪(http)

比如有服务[a,b,c],浏览器发起一个请求到 服务 a,服务 a 调用服务 b 的接口,服务 b 调用服务 c 的接口,依次做追踪。

request.js

const Request = require("request");
const noop = () => {};

// request
const request = (url, options) => {
  const method = (options && options.method) || "GET";
  const headers = (options && options.headers) || {};
  const tracer = (options && options.tracer) || { inject: noop, setTag: noop };
  const rootSpan = (options && options.rootSpan) || {};
  const _config = rootSpan ? { childOf: rootSpan } : {};
  const span = tracer.startSpan(`${url}`, _config);
  span.setTag(Tags.HTTP_URL, url);
  span.setTag(Tags.HTTP_METHOD, method);
  tracer.inject(span, FORMAT_HTTP_HEADERS, headers);
  const promise = new Promise((resolve, reject) => {
    Request(
      {
        url: url,
        method: method,
        headers: headers
      },
      (err, res, body) => {
        span.finish();
        if (err) {
          console.log("request error : ", err);
          reject(err);
        } else {
          resolve(body);
        }
      }
    );
  });
  return promise;
};

export default request

aservice.js

const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "a-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };
const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  await next();
  span.finish();
});

// app router
router.get("/abc", async (ctx, next) => {
  const result = await request("http://localhost:7072/bc", {
    tracer: ctx.tracer,
    rootSpan: ctx.tracerRootSpan
  });
  ctx.body = "get :7071/a , hello a" + "\n" + result;
});

app.use(router.routes());
app.listen(7071, () => {
  console.log("\x1B[32m port : 7071 \x1B[39m");
});

bservice.js

const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "b-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };
const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  await next();
  span.finish();
});

// app router
router.get("/bc", async (ctx, next) => {
  const span = ctx.tracer.startSpan(`api:bc`, { childOf: ctx.tracerRootSpan });
  span.setTag("request:c", ":7073/c");
  try {
    throw Error("err");
  } catch (err) {
    span.setTag("error", true);
    span.log({
      level: "error",
      message: err.message
    });
  }
  const result = await request("http://localhost:7073/c", {
    tracer: ctx.tracer,
    rootSpan: ctx.tracerRootSpan
  });
  span.finish();
  ctx.body = "get :7072/b , hello b" + "\n" + result;
});

app.use(router.routes());

app.listen(7072, () => {
  console.log("\x1B[32m port : 7072 \x1B[39m");
});

cservice.js

const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "c-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };

const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  span.log({ event: "test-log_1", kk: "kk_1", vv: "vv_1" });
  span.log({ event: "test-log_2", kk: "kk_2", vv: "vv_2" });
  span.log({ event: "test-log_3", kk: "kk_3", vv: "vv_3" });
  span.logEvent("log-event_1", { a: 1, b: 1 });
  span.logEvent("log-event_2", { a: 2, b: 2 });
  await next();
  span.finish();
});

// app router
router.get("/c", async (ctx, next) => {
  ctx.body = "get :7073/c , hello c";
});

app.use(router.routes());

app.listen(7073, () => {
  console.log("\x1B[32m port : 7073 \x1B[39m");
});

请求地址: http://localhost:7071/abc ,浏览器打开jaeger-ui的服务地址: http://localhost:16686/search

效果如图:

Go 服务之间的追踪(grpc)

这里起了一个 http 服务 main ,端口 8081,从 main 这边通过 grpc 请求服务 service 上的方法。第一次写 go 程序,不喜勿喷... 如有不正之处,感谢指出。

hello.gen.proto

syntax = "proto3";

option go_package = "hello_package";

package hello;

message HelloReq {
  string name = 1;
}

message HelloRes {
  string result = 1;
}

service HelloService {
  rpc SayHello(HelloReq) returns(HelloRes) {}
}

生成 grpc 所需文件

protoc -I helloService/ helloService/hello.gen.proto --go_out=plugins=grpc:helloservice

main.go

package main

import (
	"log"
	"context"
	"strings"
	"net/http"
	"encoding/json"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	pb "goservice/helloService"
	opentracing "github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
	openLog "github.com/opentracing/opentracing-go/log"
	"github.com/uber/jaeger-client-go"
	jaegerCfg "github.com/uber/jaeger-client-go/config"
)

// metadata 读写
type MDReaderWriter struct {
	metadata.MD
}

// 为了 opentracing.TextMapReader ,参考 opentracing 代码
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
	for k, vs := range c.MD {
		for _, v := range vs {
			if err := handler(k, v); err != nil {
				return err
			}
		}
	}
	return nil
}

// 为了 opentracing.TextMapWriter,参考 opentracing 代码
func (c MDReaderWriter) Set(key, val string) {
	key = strings.ToLower(key)
	c.MD[key] = append(c.MD[key], val)
}

func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) {
	// 配置项 参考代码 https://github.com/jaegertracing/jaeger-client-go/blob/master/config/config.go
	cfg := jaegerCfg.Configuration{
		Sampler: &jaegerCfg.SamplerConfig{
			Type: "const",
			Param: 1,
		},
		Reporter: &jaegerCfg.ReporterConfig{
			LogSpans: true,
			CollectorEndpoint: "http://localhost:14268/api/traces",
		},
	}

	cfg.ServiceName = serviceName

	tracer, _, err := cfg.NewTracer(
		jaegerCfg.Logger(jaeger.StdLogger),
	)

	if err != nil {
		log.Println("tracer error ", err)
	}

	return tracer, err
}

// 此处参考 grpc文档 https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor
func interceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor{
	return func (ctx context.Context,
		method string,
		req,
		reply interface{},
		cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker,
		opts ...grpc.CallOption) error {
		// 创建 rootSpan
		var rootCtx opentracing.SpanContext

		rootSpan := opentracing.SpanFromContext(ctx)
		if rootSpan != nil {
			rootCtx = rootSpan.Context()
		}

		span := tracer.StartSpan(
			method,
			opentracing.ChildOf(rootCtx),
			opentracing.Tag{"test","hahahahaha"},
			ext.SpanKindRPCClient,
		)

		defer span.Finish()

		md, succ := metadata.FromOutgoingContext(ctx)
		if !succ {
			md = metadata.New(nil)
		} else{
			md = md.Copy()
		}

		mdWriter := MDReaderWriter{md}

		// 注入 spanContext
		err := tracer.Inject(span.Context(), opentracing.TextMap, mdWriter)

		if err != nil {
			span.LogFields(openLog.String("inject error", err.Error()))
		}

		// new ctx ,并调用后续操作
		newCtx := metadata.NewOutgoingContext(ctx, md)
		err = invoker(newCtx, method, req, reply, cc, opts...)
		if err != nil {
			span.LogFields(openLog.String("call error", err.Error()))
		}
		return err
	}
}

// 请求执行的方法
func hello(w http.ResponseWriter, r *http.Request) {
  r.ParseForm();

	// new tracer
	tracer, err := NewJaegerTracer("mainService")
	if err != nil {
		log.Fatal("new tracer err ", err)
	}

	// dial options
	dialOpts := []grpc.DialOption{grpc.WithInsecure()}

	if tracer != nil {
		dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(interceptor(tracer)))
	}

	conn, err := grpc.Dial("localhost:8082", dialOpts...)
	if err != nil {
		log.Fatal("connect err ", err)
	}

	defer conn.Close()

	sv := pb.NewHelloServiceClient(conn)

	var name = "yeshou"
	if (len(r.Form) > 0 && len(r.Form["name"][0]) > 0) {
		name = r.Form["name"][0]
	}

	res, err := sv.SayHello(context.Background(), &pb.HelloReq{Name: name})
	if err != nil {
			log.Fatal("c.SayHello func error : ", err)
	}

	type HelloRes struct{
		Result    string  `json:"result"`
	}

	data := HelloRes{
		Result:  res.Result,
	}

	jsonData, err := json.Marshal(data)
	if err != nil {
		log.Fatal("server error : ", err)
	}

	w.Write(jsonData)
}

func main() {
	http.HandleFunc("/get_h", hello)

	err := http.ListenAndServe(":8081", nil)

	if err != nil {
		log.Fatal("Listen server err : ", err)
	}

}

service.go

package main

import (
	"log"
	"net"
	"context"
	"strings"
	"google.golang.org/grpc"
	"google.golang.org/grpc/grpclog"
	"google.golang.org/grpc/metadata"
	pb "goservice/helloService"
	opentracing "github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
	"github.com/uber/jaeger-client-go"
	jaegerCfg "github.com/uber/jaeger-client-go/config"
)

// metadata 读写
type MDReaderWriter struct {
	metadata.MD
}

// 为了 opentracing.TextMapReader ,参考 opentracing 代码
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
	for k, vs := range c.MD {
		for _, v := range vs {
			if err := handler(k, v); err != nil {
				return err
			}
		}
	}
	return nil
}

// 为了 opentracing.TextMapWriter,参考 opentracing 代码
func (c MDReaderWriter) Set(key, val string) {
	key = strings.ToLower(key)
	c.MD[key] = append(c.MD[key], val)
}

func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) {
	cfg := jaegerCfg.Configuration{
		Sampler: &jaegerCfg.SamplerConfig{
			Type: "const",
			Param: 1,
		},
		Reporter: &jaegerCfg.ReporterConfig{
			LogSpans: true,
			CollectorEndpoint: "http://localhost:14268/api/traces",
		},
	}

	cfg.ServiceName = serviceName

	tracer, _, err := cfg.NewTracer(
		jaegerCfg.Logger(jaeger.StdLogger),
	)

	if err != nil {
		log.Println("tracer error ", err)
	}

	return tracer, err
}

// 此处参考 grpc文档 https://godoc.org/google.golang.org/grpc#WithUnaryInterceptor
func interceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor{
	return func (ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (res interface{}, err error) {
			md, succ := metadata.FromIncomingContext(ctx)
			if !succ {
				md  = metadata.New(nil)
			}

			// 提取 spanContext
			spanContext, err := tracer.Extract(opentracing.TextMap, MDReaderWriter{md})
			if err != nil && err != opentracing.ErrSpanContextNotFound {
				grpclog.Errorf("extract from metadata err: %v", err)
			} else{
				span := tracer.StartSpan(
					info.FullMethod,
					ext.RPCServerOption(spanContext),
					opentracing.Tag{Key: string(ext.Component), Value: "grpc"},
					ext.SpanKindRPCServer,
				)
				defer span.Finish()
				ctx = opentracing.ContextWithSpan(ctx, span)
			}
			return handler(ctx, req)
	}
}

type server struct{}

func (s *server) SayHello(ctx context.Context, in *pb.HelloReq) (*pb.HelloRes, error) {
	return &pb.HelloRes{Result: "Hello " + in.Name}, nil
}

func main() {

	var svOpts []grpc.ServerOption
	tracer, err := NewJaegerTracer("serviceService")
	if err != nil {
		log.Fatal("new tracer err ", err)
	}

	if tracer != nil {
		svOpts = append(svOpts, grpc.UnaryInterceptor(interceptor(tracer)))
	}

	sv := grpc.NewServer(svOpts...)

	lis, err := net.Listen("tcp", ":8082")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	pb.RegisterHelloServiceServer(sv, &server{})
	if err := sv.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

请求地址:http://localhost:7071/abc ,浏览器打开jaeger-ui的服务地址: http://localhost:16686/search。

效果如图:

相关链接

opentracing-specification
opentracing-javascript
jaegertracing
jaeger-client-node
jaeger-client-go
istio-zh
demo-github

本文仅做 jaeger 使用的简单参考,实际项目考虑到 Microservices 、 Service Mesh 、 Business Logic Logs 等等影响因素,tracing 会更具复杂性且有更多的坑得踩。