微服务的链路是必须的,现在有空,写个demo吧
    主要分为三个部分:gateway、service1(http)、service2(grpc)
    项目地址:https://github.com/jwrookie/trace-demo

    gateway

    1. package main
    2. import (
    3. "github.com/gin-gonic/gin"
    4. "github.com/opentracing/opentracing-go"
    5. "github.com/opentracing/opentracing-go/ext"
    6. "github.com/opentracing/opentracing-go/log"
    7. "io"
    8. "io/ioutil"
    9. "net/http"
    10. "trace"
    11. )
    12. var (
    13. tracer opentracing.Tracer
    14. closer io.Closer
    15. )
    16. func traceMiddle(c *gin.Context) {
    17. span := tracer.StartSpan(c.Request.URL.String(),opentracing.Tag{Key: string(ext.Component), Value: "http"})
    18. c.Set("span",span)
    19. defer func() {
    20. ext.SpanKindRPCClient.Set(span)
    21. span.Finish()
    22. }()
    23. c.Next()
    24. }
    25. func main() {
    26. tracer, closer = tools.InitJaegerClient("trace-demo")
    27. defer closer.Close()
    28. router := gin.Default()
    29. router.GET("/ping",traceMiddle,pong)
    30. router.Run(":8080")
    31. }
    32. func pong(c *gin.Context){
    33. span := c.MustGet("span").(opentracing.Span)
    34. httpClient := &http.Client{}
    35. httpReq ,_ := http.NewRequest("GET","http://127.0.0.1:8081/ping",nil)
    36. carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
    37. if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier);err != nil {
    38. span.LogFields(log.String("err", err.Error()))
    39. c.JSON(200,gin.H{"err":err.Error()})
    40. return
    41. }
    42. res,err := httpClient.Do(httpReq)
    43. if err != nil {
    44. span.LogFields(log.String("err", err.Error()))
    45. c.JSON(200,gin.H{"err":err.Error()})
    46. return
    47. }
    48. defer res.Body.Close()
    49. resS , _:= ioutil.ReadAll(res.Body)
    50. c.JSON(200,gin.H{"data" : string(resS)})
    51. }

    service1

    1. import (
    2. "context"
    3. "github.com/gin-gonic/gin"
    4. "github.com/opentracing/opentracing-go"
    5. "github.com/opentracing/opentracing-go/ext"
    6. "github.com/opentracing/opentracing-go/log"
    7. "google.golang.org/grpc"
    8. "google.golang.org/grpc/grpclog"
    9. "google.golang.org/grpc/metadata"
    10. "io"
    11. "trace"
    12. pb "trace/cmd/protos"
    13. )
    14. const (
    15. PORT = "127.0.0.1:8082"
    16. )
    17. var (
    18. tracer opentracing.Tracer
    19. closer io.Closer
    20. )
    21. func traceMiddle(c *gin.Context) {
    22. refFunc := opentracing.FollowsFrom
    23. carrier := opentracing.HTTPHeadersCarrier(c.Request.Header)
    24. clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
    25. if err != nil {
    26. c.AbortWithStatusJSON(200,gin.H{"data":err.Error()})
    27. return
    28. }
    29. span := tracer.StartSpan(c.Request.URL.String(),refFunc(clientContext),opentracing.Tag{Key: string(ext.Component), Value: "http"})
    30. defer func() {
    31. ext.SpanKindRPCServer.Set(span)
    32. span.Finish()
    33. }()
    34. c.Set("span",span)
    35. c.Next()
    36. }
    37. func main() {
    38. tracer, closer = tools.InitJaegerClient("trace-demo")
    39. defer closer.Close()
    40. router := gin.Default()
    41. router.GET("/ping",traceMiddle,pong)
    42. router.Run(":8081")
    43. }
    44. func pong(c *gin.Context){
    45. span := c.MustGet("span").(opentracing.Span)
    46. span.LogFields(log.String(c.Request.URL.String() + "---NewRpcServerClient :","rpcService:Ping"))
    47. conn, err := grpc.Dial(PORT, grpc.WithInsecure(), grpc.WithUnaryInterceptor(OpenTracingClientInterceptor()))
    48. if err != nil {
    49. span.LogFields(log.String("err", err.Error()))
    50. return
    51. }
    52. client := pb.NewRpcServerClient(conn)
    53. parm1 := pb.PingReq{}
    54. ctx := opentracing.ContextWithSpan(context.TODO(),span)
    55. r, err := client.Ping(ctx, &parm1)
    56. if err != nil {
    57. span.LogFields(log.String("err", err.Error()))
    58. return
    59. }
    60. c.String(200,"%s",r.Res)
    61. }
    62. //OpenTracingClientInterceptor rewrite client's interceptor with open tracing
    63. func OpenTracingClientInterceptor() grpc.UnaryClientInterceptor {
    64. return func(
    65. ctx context.Context,
    66. method string,
    67. req, resp interface{},
    68. cc *grpc.ClientConn,
    69. invoker grpc.UnaryInvoker,
    70. opts ...grpc.CallOption,
    71. ) error {
    72. var parentCtx opentracing.SpanContext
    73. if parent := opentracing.SpanFromContext(ctx); parent != nil {
    74. parentCtx = parent.Context()
    75. }
    76. cliSpan := tracer.StartSpan(
    77. method,
    78. opentracing.ChildOf(parentCtx),
    79. opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
    80. ext.SpanKindRPCClient,
    81. )
    82. defer cliSpan.Finish()
    83. md, ok := metadata.FromOutgoingContext(ctx)
    84. if !ok {
    85. md = metadata.New(nil)
    86. } else {
    87. md = md.Copy()
    88. }
    89. mdWriter := tools.MDReaderWriter{md}
    90. err := tracer.Inject(cliSpan.Context(), opentracing.TextMap, mdWriter)
    91. if err != nil {
    92. grpclog.Errorf("inject to metadata err %v", err)
    93. }
    94. ctx = metadata.NewOutgoingContext(ctx, mdWriter.MD)
    95. err = invoker(ctx, method, req, resp, cc, opts...)
    96. if err != nil {
    97. cliSpan.LogFields(log.String("err", err.Error()))
    98. }
    99. return err
    100. }
    101. }

    service2

    1. package main
    2. import (
    3. "context"
    4. "github.com/opentracing/opentracing-go"
    5. "github.com/opentracing/opentracing-go/ext"
    6. olog "github.com/opentracing/opentracing-go/log"
    7. "google.golang.org/grpc"
    8. "google.golang.org/grpc/grpclog"
    9. "google.golang.org/grpc/metadata"
    10. "io"
    11. "log"
    12. "net"
    13. "trace"
    14. pb "trace/cmd/protos"
    15. )
    16. const (
    17. PORT = ":8082"
    18. )
    19. var (
    20. tracer opentracing.Tracer
    21. closer io.Closer
    22. )
    23. type Server struct {
    24. }
    25. func (s *Server) Ping(ctx context.Context, all *pb.PingReq) (*pb.PingRes, error){
    26. var res = new(pb.PingRes)
    27. span := ctx.Value("span").(opentracing.Span)
    28. span.LogFields(olog.String("pos","rpcService:Ping"))
    29. res.Res = "pong"
    30. return res,nil
    31. }
    32. func main() {
    33. tracer, closer = tools.InitJaegerClient("trace-demo")
    34. defer closer.Close()
    35. lis, err := net.Listen("tcp", PORT)
    36. if err != nil {
    37. log.Fatalf("failed to listen: %v", err)
    38. }
    39. opts := grpc.UnaryInterceptor(OpentracingServerInterceptor())
    40. s := grpc.NewServer(opts) //起一个服务
    41. pb.RegisterRpcServerServer(s, &Server{})
    42. if err := s.Serve(lis); err != nil {
    43. log.Fatalf("failed to serve: %v", err)
    44. }
    45. }
    46. func OpentracingServerInterceptor() grpc.UnaryServerInterceptor {
    47. return func(
    48. ctx context.Context,
    49. req interface{},
    50. info *grpc.UnaryServerInfo,
    51. handler grpc.UnaryHandler,
    52. ) (resp interface{}, err error) {
    53. md, ok := metadata.FromIncomingContext(ctx)
    54. if !ok {
    55. md = metadata.New(nil)
    56. }
    57. spanContext, err := tracer.Extract(opentracing.TextMap, tools.MDReaderWriter{md})
    58. if err != nil && err != opentracing.ErrSpanContextNotFound {
    59. grpclog.Errorf("extract from metadata err %v", err)
    60. }
    61. serverSpan := tracer.StartSpan(
    62. info.FullMethod,
    63. ext.RPCServerOption(spanContext),
    64. opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
    65. ext.SpanKindRPCServer,
    66. )
    67. defer serverSpan.Finish()
    68. ctx = context.WithValue(ctx,"span",serverSpan)
    69. return handler(ctx, req)
    70. }
    71. }

    tools

    1. package tools
    2. import (
    3. "fmt"
    4. "github.com/opentracing/opentracing-go"
    5. "github.com/uber/jaeger-client-go"
    6. "github.com/uber/jaeger-client-go/config"
    7. "google.golang.org/grpc/metadata"
    8. "io"
    9. )
    10. func InitJaegerClient(service string) (opentracing.Tracer, io.Closer) {
    11. cfg := &config.Configuration{
    12. ServiceName: service,
    13. Sampler: &config.SamplerConfig{
    14. Type: "const",
    15. Param: 1,
    16. },
    17. Reporter: &config.ReporterConfig{
    18. LogSpans: true,
    19. LocalAgentHostPort: "172.17.0.6:6831",
    20. },
    21. }
    22. tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
    23. if err != nil {
    24. panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    25. }
    26. return tracer, closer
    27. }
    28. //MDReaderWriter metadata Reader and Writer
    29. type MDReaderWriter struct {
    30. MD metadata.MD
    31. }
    32. //ForeachKey range all keys to call handler
    33. func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
    34. for k, vs := range c.MD {
    35. for _, v := range vs {
    36. if err := handler(k, v); err != nil {
    37. return err
    38. }
    39. }
    40. }
    41. return nil
    42. }
    43. // Set implements Set() of opentracing.TextMapWriter
    44. func (c MDReaderWriter) Set(key, val string) {
    45. c.MD.Set(key, val)
    46. }

    proto

    1. syntax = "proto3";
    2. package protos;
    3. message pingReq {
    4. }
    5. message pingRes {
    6. string res = 1;
    7. }
    8. service rpcServer {
    9. rpc Ping(pingReq) returns (pingRes) {}
    10. }

    最终效果
    image.png