微服务的链路是必须的,现在有空,写个demo吧
主要分为三个部分:gateway、service1(http)、service2(grpc)
项目地址:https://github.com/jwrookie/trace-demo
gateway
package mainimport ("github.com/gin-gonic/gin""github.com/opentracing/opentracing-go""github.com/opentracing/opentracing-go/ext""github.com/opentracing/opentracing-go/log""io""io/ioutil""net/http""trace")var (tracer opentracing.Tracercloser io.Closer)func traceMiddle(c *gin.Context) {span := tracer.StartSpan(c.Request.URL.String(),opentracing.Tag{Key: string(ext.Component), Value: "http"})c.Set("span",span)defer func() {ext.SpanKindRPCClient.Set(span)span.Finish()}()c.Next()}func main() {tracer, closer = tools.InitJaegerClient("trace-demo")defer closer.Close()router := gin.Default()router.GET("/ping",traceMiddle,pong)router.Run(":8080")}func pong(c *gin.Context){span := c.MustGet("span").(opentracing.Span)httpClient := &http.Client{}httpReq ,_ := http.NewRequest("GET","http://127.0.0.1:8081/ping",nil)carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier);err != nil {span.LogFields(log.String("err", err.Error()))c.JSON(200,gin.H{"err":err.Error()})return}res,err := httpClient.Do(httpReq)if err != nil {span.LogFields(log.String("err", err.Error()))c.JSON(200,gin.H{"err":err.Error()})return}defer res.Body.Close()resS , _:= ioutil.ReadAll(res.Body)c.JSON(200,gin.H{"data" : string(resS)})}
service1
import ("context""github.com/gin-gonic/gin""github.com/opentracing/opentracing-go""github.com/opentracing/opentracing-go/ext""github.com/opentracing/opentracing-go/log""google.golang.org/grpc""google.golang.org/grpc/grpclog""google.golang.org/grpc/metadata""io""trace"pb "trace/cmd/protos")const (PORT = "127.0.0.1:8082")var (tracer opentracing.Tracercloser io.Closer)func traceMiddle(c *gin.Context) {refFunc := opentracing.FollowsFromcarrier := opentracing.HTTPHeadersCarrier(c.Request.Header)clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)if err != nil {c.AbortWithStatusJSON(200,gin.H{"data":err.Error()})return}span := tracer.StartSpan(c.Request.URL.String(),refFunc(clientContext),opentracing.Tag{Key: string(ext.Component), Value: "http"})defer func() {ext.SpanKindRPCServer.Set(span)span.Finish()}()c.Set("span",span)c.Next()}func main() {tracer, closer = tools.InitJaegerClient("trace-demo")defer closer.Close()router := gin.Default()router.GET("/ping",traceMiddle,pong)router.Run(":8081")}func pong(c *gin.Context){span := c.MustGet("span").(opentracing.Span)span.LogFields(log.String(c.Request.URL.String() + "---NewRpcServerClient :","rpcService:Ping"))conn, err := grpc.Dial(PORT, grpc.WithInsecure(), grpc.WithUnaryInterceptor(OpenTracingClientInterceptor()))if err != nil {span.LogFields(log.String("err", err.Error()))return}client := pb.NewRpcServerClient(conn)parm1 := pb.PingReq{}ctx := opentracing.ContextWithSpan(context.TODO(),span)r, err := client.Ping(ctx, &parm1)if err != nil {span.LogFields(log.String("err", err.Error()))return}c.String(200,"%s",r.Res)}//OpenTracingClientInterceptor rewrite client's interceptor with open tracingfunc OpenTracingClientInterceptor() grpc.UnaryClientInterceptor {return func(ctx context.Context,method string,req, resp interface{},cc *grpc.ClientConn,invoker grpc.UnaryInvoker,opts ...grpc.CallOption,) error {var parentCtx opentracing.SpanContextif parent := opentracing.SpanFromContext(ctx); parent != nil {parentCtx = parent.Context()}cliSpan := tracer.StartSpan(method,opentracing.ChildOf(parentCtx),opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},ext.SpanKindRPCClient,)defer cliSpan.Finish()md, ok := metadata.FromOutgoingContext(ctx)if !ok {md = metadata.New(nil)} else {md = md.Copy()}mdWriter := tools.MDReaderWriter{md}err := tracer.Inject(cliSpan.Context(), opentracing.TextMap, mdWriter)if err != nil {grpclog.Errorf("inject to metadata err %v", err)}ctx = metadata.NewOutgoingContext(ctx, mdWriter.MD)err = invoker(ctx, method, req, resp, cc, opts...)if err != nil {cliSpan.LogFields(log.String("err", err.Error()))}return err}}
service2
package mainimport ("context""github.com/opentracing/opentracing-go""github.com/opentracing/opentracing-go/ext"olog "github.com/opentracing/opentracing-go/log""google.golang.org/grpc""google.golang.org/grpc/grpclog""google.golang.org/grpc/metadata""io""log""net""trace"pb "trace/cmd/protos")const (PORT = ":8082")var (tracer opentracing.Tracercloser io.Closer)type Server struct {}func (s *Server) Ping(ctx context.Context, all *pb.PingReq) (*pb.PingRes, error){var res = new(pb.PingRes)span := ctx.Value("span").(opentracing.Span)span.LogFields(olog.String("pos","rpcService:Ping"))res.Res = "pong"return res,nil}func main() {tracer, closer = tools.InitJaegerClient("trace-demo")defer closer.Close()lis, err := net.Listen("tcp", PORT)if err != nil {log.Fatalf("failed to listen: %v", err)}opts := grpc.UnaryInterceptor(OpentracingServerInterceptor())s := grpc.NewServer(opts) //起一个服务pb.RegisterRpcServerServer(s, &Server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func OpentracingServerInterceptor() grpc.UnaryServerInterceptor {return func(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler,) (resp interface{}, err error) {md, ok := metadata.FromIncomingContext(ctx)if !ok {md = metadata.New(nil)}spanContext, err := tracer.Extract(opentracing.TextMap, tools.MDReaderWriter{md})if err != nil && err != opentracing.ErrSpanContextNotFound {grpclog.Errorf("extract from metadata err %v", err)}serverSpan := tracer.StartSpan(info.FullMethod,ext.RPCServerOption(spanContext),opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},ext.SpanKindRPCServer,)defer serverSpan.Finish()ctx = context.WithValue(ctx,"span",serverSpan)return handler(ctx, req)}}
tools
package toolsimport ("fmt""github.com/opentracing/opentracing-go""github.com/uber/jaeger-client-go""github.com/uber/jaeger-client-go/config""google.golang.org/grpc/metadata""io")func InitJaegerClient(service string) (opentracing.Tracer, io.Closer) {cfg := &config.Configuration{ServiceName: service,Sampler: &config.SamplerConfig{Type: "const",Param: 1,},Reporter: &config.ReporterConfig{LogSpans: true,LocalAgentHostPort: "172.17.0.6:6831",},}tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))if err != nil {panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))}return tracer, closer}//MDReaderWriter metadata Reader and Writertype MDReaderWriter struct {MD metadata.MD}//ForeachKey range all keys to call handlerfunc (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {for k, vs := range c.MD {for _, v := range vs {if err := handler(k, v); err != nil {return err}}}return nil}// Set implements Set() of opentracing.TextMapWriterfunc (c MDReaderWriter) Set(key, val string) {c.MD.Set(key, val)}
proto
syntax = "proto3";package protos;message pingReq {}message pingRes {string res = 1;}service rpcServer {rpc Ping(pingReq) returns (pingRes) {}}
最终效果
