微服务的链路是必须的,现在有空,写个demo吧
主要分为三个部分:gateway、service1(http)、service2(grpc)
项目地址:https://github.com/jwrookie/trace-demo
gateway
package main
import (
"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"io"
"io/ioutil"
"net/http"
"trace"
)
var (
tracer opentracing.Tracer
closer io.Closer
)
func traceMiddle(c *gin.Context) {
span := tracer.StartSpan(c.Request.URL.String(),opentracing.Tag{Key: string(ext.Component), Value: "http"})
c.Set("span",span)
defer func() {
ext.SpanKindRPCClient.Set(span)
span.Finish()
}()
c.Next()
}
func main() {
tracer, closer = tools.InitJaegerClient("trace-demo")
defer closer.Close()
router := gin.Default()
router.GET("/ping",traceMiddle,pong)
router.Run(":8080")
}
func pong(c *gin.Context){
span := c.MustGet("span").(opentracing.Span)
httpClient := &http.Client{}
httpReq ,_ := http.NewRequest("GET","http://127.0.0.1:8081/ping",nil)
carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier);err != nil {
span.LogFields(log.String("err", err.Error()))
c.JSON(200,gin.H{"err":err.Error()})
return
}
res,err := httpClient.Do(httpReq)
if err != nil {
span.LogFields(log.String("err", err.Error()))
c.JSON(200,gin.H{"err":err.Error()})
return
}
defer res.Body.Close()
resS , _:= ioutil.ReadAll(res.Body)
c.JSON(200,gin.H{"data" : string(resS)})
}
service1
import (
"context"
"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"io"
"trace"
pb "trace/cmd/protos"
)
const (
PORT = "127.0.0.1:8082"
)
var (
tracer opentracing.Tracer
closer io.Closer
)
func traceMiddle(c *gin.Context) {
refFunc := opentracing.FollowsFrom
carrier := opentracing.HTTPHeadersCarrier(c.Request.Header)
clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
c.AbortWithStatusJSON(200,gin.H{"data":err.Error()})
return
}
span := tracer.StartSpan(c.Request.URL.String(),refFunc(clientContext),opentracing.Tag{Key: string(ext.Component), Value: "http"})
defer func() {
ext.SpanKindRPCServer.Set(span)
span.Finish()
}()
c.Set("span",span)
c.Next()
}
func main() {
tracer, closer = tools.InitJaegerClient("trace-demo")
defer closer.Close()
router := gin.Default()
router.GET("/ping",traceMiddle,pong)
router.Run(":8081")
}
func pong(c *gin.Context){
span := c.MustGet("span").(opentracing.Span)
span.LogFields(log.String(c.Request.URL.String() + "---NewRpcServerClient :","rpcService:Ping"))
conn, err := grpc.Dial(PORT, grpc.WithInsecure(), grpc.WithUnaryInterceptor(OpenTracingClientInterceptor()))
if err != nil {
span.LogFields(log.String("err", err.Error()))
return
}
client := pb.NewRpcServerClient(conn)
parm1 := pb.PingReq{}
ctx := opentracing.ContextWithSpan(context.TODO(),span)
r, err := client.Ping(ctx, &parm1)
if err != nil {
span.LogFields(log.String("err", err.Error()))
return
}
c.String(200,"%s",r.Res)
}
//OpenTracingClientInterceptor rewrite client's interceptor with open tracing
func OpenTracingClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, resp interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
var parentCtx opentracing.SpanContext
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentCtx = parent.Context()
}
cliSpan := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
ext.SpanKindRPCClient,
)
defer cliSpan.Finish()
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}
mdWriter := tools.MDReaderWriter{md}
err := tracer.Inject(cliSpan.Context(), opentracing.TextMap, mdWriter)
if err != nil {
grpclog.Errorf("inject to metadata err %v", err)
}
ctx = metadata.NewOutgoingContext(ctx, mdWriter.MD)
err = invoker(ctx, method, req, resp, cc, opts...)
if err != nil {
cliSpan.LogFields(log.String("err", err.Error()))
}
return err
}
}
service2
package main
import (
"context"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
olog "github.com/opentracing/opentracing-go/log"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"io"
"log"
"net"
"trace"
pb "trace/cmd/protos"
)
const (
PORT = ":8082"
)
var (
tracer opentracing.Tracer
closer io.Closer
)
type Server struct {
}
func (s *Server) Ping(ctx context.Context, all *pb.PingReq) (*pb.PingRes, error){
var res = new(pb.PingRes)
span := ctx.Value("span").(opentracing.Span)
span.LogFields(olog.String("pos","rpcService:Ping"))
res.Res = "pong"
return res,nil
}
func main() {
tracer, closer = tools.InitJaegerClient("trace-demo")
defer closer.Close()
lis, err := net.Listen("tcp", PORT)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
opts := grpc.UnaryInterceptor(OpentracingServerInterceptor())
s := grpc.NewServer(opts) //起一个服务
pb.RegisterRpcServerServer(s, &Server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func OpentracingServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
}
spanContext, err := tracer.Extract(opentracing.TextMap, tools.MDReaderWriter{md})
if err != nil && err != opentracing.ErrSpanContextNotFound {
grpclog.Errorf("extract from metadata err %v", err)
}
serverSpan := tracer.StartSpan(
info.FullMethod,
ext.RPCServerOption(spanContext),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
ext.SpanKindRPCServer,
)
defer serverSpan.Finish()
ctx = context.WithValue(ctx,"span",serverSpan)
return handler(ctx, req)
}
}
tools
package tools
import (
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc/metadata"
"io"
)
func InitJaegerClient(service string) (opentracing.Tracer, io.Closer) {
cfg := &config.Configuration{
ServiceName: service,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "172.17.0.6:6831",
},
}
tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}
//MDReaderWriter metadata Reader and Writer
type MDReaderWriter struct {
MD metadata.MD
}
//ForeachKey range all keys to call handler
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
for k, vs := range c.MD {
for _, v := range vs {
if err := handler(k, v); err != nil {
return err
}
}
}
return nil
}
// Set implements Set() of opentracing.TextMapWriter
func (c MDReaderWriter) Set(key, val string) {
c.MD.Set(key, val)
}
proto
syntax = "proto3";
package protos;
message pingReq {
}
message pingRes {
string res = 1;
}
service rpcServer {
rpc Ping(pingReq) returns (pingRes) {}
}
最终效果