每次调用负载均衡

值得注意的是,gRPC 内的负载均衡是在每个调用的基础上发生的,而不是基于每个连接的。
换句话说,即使所有请求都来自单个客户端,我们仍然希望它们在所有 服务器 之间进行负载均衡。

负载均衡的方法

代理模型(集中式LB(Proxy Model))

使用代理提供可靠的可信客户端,可以向负载均衡系统报告负载。代理通常需要更多资源才能运行,因为它们具有 RPC 请求和响应的临时副本。
此模型还会增加 RPC 的延迟。在考虑诸如存储之类的请求重型服务时,代理模型被认为是低效的。
grpc/balancer - 图1

客户端负载均衡方式(进程内LB(Balancing-aware Client))

这个较重的客户端将更多的负载均衡逻辑放在客户端中。例如,客户端可以包含许多用于从列表中选择服务器的负载均衡策略(循环,随机等)。在此模型中,服务器列表将在客户端中静态配置,由名称解析系统提供,外部负载均衡器等。在任何情况下,客户端都负责从列表中选择首选服务器。
这种方法的缺点之一是以多种语言和/或客户端版本编写和维护负载均衡策略。这些策略可能相当复杂。一些算法还需要客户端到服务器通信,因此除了为用户请求发送 RPC 之外,客户端还需要更重,以支持其他 RPC 来获取运行状况或负载信息。
它还会使客户端的代码大大复杂化:新设计隐藏了多层负载均衡的复杂性,并将其作为服务器的简单列表呈现给客户端。
grpc/balancer - 图2

外部负载均衡服务(独立 LB 进程(External Load Balancing Service))

客户端负载均衡代码保持简单和可移植,实现用于服务器选择的众所周知的算法(例如,循环)。 复杂的负载均衡算法由负载均衡器提供。 客户端依赖负载均衡器来提供负载均衡配置以及客户端应向其发送请求的服务器列表。 平衡器根据需要更新服务器列表以平衡负载以及处理服务器不可用或健康问题。 负载均衡器将做出任何必要的复杂决策并通知客户。 负载均衡器可以与后端服务器通信以收集负载和健康信息。
grpc/balancer - 图3

GRPC服务发现及负载均衡

负载均衡策略适用于命名解析和与服务器的连接之间的 gRPC 客户端工作流。
设计文档:https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
gRPC 开源组件官方并未直接提供服务注册与发现的功能实现,但其设计文档已提供实现的思路,并在不同语言的 gRPC 代码 API 中已提供了命名解析和负载均衡接口供扩展。
以下是它的工作原理:
image.png

  1. 启动时,gRPC 客户端会为服务器名称发出 名称解析 请求。该名称将解析为一个或多个 IP 地址,每个 IP 地址将指示它是服务器地址还是负载均衡器地址,以及指示要使用哪个客户端负载均衡策略的 服务配置 (例如,round_robin 或 grpclb)。
  2. 客户端实例化负载均衡策略。
    • 注意:如果解析程序返回的任何一个地址是均衡器地址,则无论服务配置请求了什么负载均衡策略,客户端都将使用 grpclb 策略。否则,客户端将使用服务配置请求的负载均衡策略。如果服务配置未请求负载均衡策略,则客户端将默认使用选择第一个可用服务器地址的策略。
  3. 负载均衡策略为每个服务器地址创建一个子通道。
    • 对于除 grpclb 之外的所有策略,这意味着解析器返回的每个地址都有一个子通道。请注意,这些策略会忽略解析程序返回的任何均衡器地址。
    • 对于grpclb策略,工作流程如下: a. 该策略打开一个流到解析器返回的平衡器地址之一。它要求平衡器将服务器地址用于客户端最初请求的服务器名称(即,最初传递给名称解析器的服务器名称)。 + 注意:在 grpclb 策略中,解析器返回的非平衡器地址用作后备,以防在启动 LB 策略时无法联系到平衡器。 b. 如果负载均衡器的配置需要该信息,则负载均衡器指向客户端的 gRPC 服务器可以向负载均衡器报告负载。 c. 负载均衡器将服务器列表返回给 gRPC 客户端的 grpclb 策略。然后,grpclb 策略将为列表中的每个服务器创建一个子通道。
  4. 对于发送的每个 RPC ,负载平衡策略决定应将 RPC 发送到哪个子通道(即哪个服务器)。
    • 对于 grpclb 策略,客户端将按负载均衡器返回的顺序向服务器发送请求。如果服务器列表为空,则调用将阻塞,直到收到非空的调用。

轮训

proto

  1. syntax = "proto3";
  2. package helloworld;
  3. message HelloRequest {
  4. string name = 1;
  5. }
  6. message HelloReply {
  7. string message = 1;
  8. }
  9. service Greeter {
  10. rpc SayHello (HelloRequest) returns (HelloReply) {}
  11. }

register

  1. package resolver
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. clientv3 "go.etcd.io/etcd/client/v3"
  8. )
  9. type Register struct {
  10. key string
  11. client3 *clientv3.Client
  12. serverAddress string
  13. stop chan bool
  14. interval time.Duration
  15. leaseTime int64
  16. }
  17. func NewRegister(key string, client3 *clientv3.Client, serverAddress string) *Register {
  18. return &Register{
  19. key: fmt.Sprintf("/%s/%s/", schema, key),
  20. serverAddress: serverAddress,
  21. client3: client3,
  22. interval: 60 * time.Second,
  23. leaseTime: 70,
  24. stop: make(chan bool, 1),
  25. }
  26. }
  27. func (r *Register) Regist() {
  28. lgs, err := r.client3.Grant(context.TODO(), r.leaseTime)
  29. if nil != err {
  30. panic(err)
  31. }
  32. if _, err := r.client3.Put(context.TODO(), r.key, r.serverAddress, clientv3.WithLease(lgs.ID)); nil != err {
  33. panic(err)
  34. }
  35. ch, err := r.client3.KeepAlive(context.TODO(), lgs.ID)
  36. if err != nil {
  37. panic(err)
  38. }
  39. go func() {
  40. for {
  41. ka := <-ch
  42. fmt.Println("ttl:", ka.TTL)
  43. }
  44. }()
  45. }
  46. func (r *Register) GetServerAddress() string {
  47. return r.serverAddress
  48. }
  49. func (r *Register) UnRegist() {
  50. r.stop <- true
  51. if _, err := r.client3.Delete(context.TODO(), r.key); nil != err {
  52. panic(err)
  53. } else {
  54. log.Printf("%s UnReg Sucess", r.key)
  55. }
  56. }

resolver

  1. package resolver
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. clientv3 "go.etcd.io/etcd/client/v3"
  7. "google.golang.org/grpc/resolver"
  8. )
  9. const schema = "etcdv3_resolver"
  10. // resolver is the implementaion of resolver.Resolve resolver.Builder
  11. type Resolver struct {
  12. target string
  13. service string
  14. cli *clientv3.Client
  15. cc resolver.ClientConn
  16. }
  17. func NewResolver(target string, service string) resolver.Builder {
  18. return &Resolver{target: target, service: service}
  19. }
  20. // Scheme return etcdv3 schema
  21. func (r *Resolver) Scheme() string {
  22. return schema
  23. }
  24. // ResolveNow 立马做一次解析
  25. func (r *Resolver) ResolveNow(rn resolver.ResolveNowOptions) {
  26. // 与watch像似,没监控的逻辑
  27. addrDict := make(map[string]resolver.Address)
  28. update := func() {
  29. addrList := make([]resolver.Address, 0, len(addrDict))
  30. for _, v := range addrDict {
  31. addrList = append(addrList, v)
  32. }
  33. _ = r.cc.UpdateState(resolver.State{Addresses: addrList})
  34. }
  35. // 按前缀取出键值对,并更新r.cc
  36. resp, err := r.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  37. if err == nil {
  38. for i := range resp.Kvs {
  39. addrDict[string(resp.Kvs[i].Key)] = resolver.Address{Addr: string(resp.Kvs[i].Value)}
  40. }
  41. }
  42. update()
  43. }
  44. // Close
  45. func (r *Resolver) Close() {
  46. }
  47. // Build to resolver.Resolver
  48. func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  49. var err error
  50. r.cli, err = clientv3.New(clientv3.Config{
  51. Endpoints: strings.Split(r.target, ","),
  52. })
  53. if err != nil {
  54. return nil, fmt.Errorf("grpclb: create clientv3 client failed: %v", err)
  55. }
  56. r.cc = cc
  57. go r.watch(fmt.Sprintf("/%s/%s/", schema, r.service))
  58. return r, nil
  59. }
  60. // 监听服务注册的变化
  61. func (r *Resolver) watch(prefix string) {
  62. addrDict := make(map[string]resolver.Address)
  63. update := func() {
  64. addrList := make([]resolver.Address, 0, len(addrDict))
  65. for _, v := range addrDict {
  66. addrList = append(addrList, v)
  67. }
  68. _ = r.cc.UpdateState(resolver.State{Addresses: addrList})
  69. }
  70. // 按前缀取出键值对,并更新r.cc
  71. resp, err := r.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  72. if err == nil {
  73. for i := range resp.Kvs {
  74. addrDict[string(resp.Kvs[i].Key)] = resolver.Address{Addr: string(resp.Kvs[i].Value)}
  75. }
  76. }
  77. update()
  78. // 监听服务注册的变化,并更新r.cc
  79. rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
  80. for n := range rch {
  81. for _, ev := range n.Events {
  82. switch ev.Type {
  83. case clientv3.EventTypePut:
  84. addrDict[string(ev.Kv.Key)] = resolver.Address{Addr: string(ev.Kv.Value)}
  85. case clientv3.EventTypeDelete:
  86. delete(addrDict, string(ev.PrevKv.Key))
  87. }
  88. }
  89. update()
  90. }
  91. }

client

  1. package main
  2. import (
  3. resolver2 "commons/grpclb/resolver"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "strconv"
  8. "time"
  9. pb "commons/grpclb/proto"
  10. "github.com/sirupsen/logrus"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/balancer/roundrobin"
  13. "google.golang.org/grpc/resolver"
  14. )
  15. var (
  16. svc = flag.String("service", "hello_service", "service name")
  17. reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
  18. )
  19. func main() {
  20. flag.Parse()
  21. r := resolver2.NewResolver(*reg, *svc)
  22. resolver.Register(r)
  23. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  24. defer cancel()
  25. // https://github.com/grpc/grpc/blob/master/doc/naming.md
  26. // The gRPC client library will use the specified scheme to pick the right resolver plugin and pass it the fully qualified name string.
  27. DefaultServiceConfig := fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)
  28. // authority是自己随便起的,不是必须的,但是r.Scheme()+"://authority/"+*svc这种格式是必须的
  29. conn, err := grpc.DialContext(ctx, r.Scheme()+"://authority/"+*svc, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(DefaultServiceConfig), grpc.WithBlock())
  30. if err != nil {
  31. panic(err)
  32. }
  33. ticker := time.NewTicker(100 * time.Millisecond)
  34. client := pb.NewGreeterClient(conn)
  35. for t := range ticker.C {
  36. resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
  37. if err == nil {
  38. logrus.Infof("%v: Reply is %s\n", t, resp.Message)
  39. } else {
  40. logrus.Error(err)
  41. }
  42. }
  43. }

server

  1. package main
  2. import (
  3. "commons/grpclb/resolver"
  4. "context"
  5. "flag"
  6. "log"
  7. "net"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "syscall"
  12. "time"
  13. clientv3 "go.etcd.io/etcd/client/v3"
  14. "google.golang.org/grpc"
  15. pb "commons/grpclb/proto"
  16. "github.com/sirupsen/logrus"
  17. )
  18. const DialTimeout = time.Second * 5
  19. var (
  20. serv = flag.String("service", "hello_service", "service name")
  21. host = flag.String("host", "localhost", "listening host")
  22. port = flag.String("port", "50000", "listening port")
  23. endpoints = flag.String("reg", "127.0.0.1:2379", "register etcd address")
  24. )
  25. func init() {
  26. flag.Parse()
  27. }
  28. // 程序停止去掉etcd里的信息
  29. func deadNotify(reg *resolver.Register) {
  30. ch := make(chan os.Signal, 1)
  31. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  32. go func() {
  33. log.Printf("signal.Notify %v", <-ch)
  34. reg.UnRegist()
  35. os.Exit(1)
  36. }()
  37. }
  38. func main() {
  39. // 端口监听
  40. lis, err := net.Listen("tcp", net.JoinHostPort(*host, *port))
  41. if err != nil {
  42. panic(err)
  43. }
  44. defer lis.Close()
  45. // 初始化etcd连接
  46. client3, err := clientv3.New(clientv3.Config{
  47. Endpoints: strings.Split(*endpoints, ","),
  48. DialTimeout: DialTimeout,
  49. })
  50. if err != nil {
  51. panic(err)
  52. }
  53. defer client3.Close()
  54. reg := resolver.NewRegister(*serv+"/"+net.JoinHostPort(*host, *port), client3, net.JoinHostPort(*host, *port))
  55. reg.Regist() //服务注册到etcd
  56. deadNotify(reg) //取消注册
  57. logrus.Infof("starting hello service at %s", *port)
  58. s := grpc.NewServer()
  59. pb.RegisterGreeterServer(s, &server{})
  60. _ = s.Serve(lis)
  61. }
  62. // server is used to implement helloworld.GreeterServer.
  63. type server struct{}
  64. // SayHello implements helloworld.GreeterServer
  65. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  66. logrus.Infof("%v: Receive is %s\n", time.Now(), in.Name)
  67. time.Sleep(time.Second * 5)
  68. return &pb.HelloReply{Message: "Hello " + in.Name + " from " + net.JoinHostPort(*host, *port)}, nil
  69. }

当流量进入5000的服务,此时我们使用kill -9 的方式干掉他,会收到如下的错误image.png
由于kill -9 无法被进程捕获, 此时etcd里的节点信息是没有发生变化的
image.png
但是注意没有,由于etcd里的节点信息没发生变化,我们的resolver是无法更新节点信息的,但看之后的请求,并没继续走到5000这个节点上

挂掉的节点怎么移除etcd?

首选我们做了信号监听,但像kill -9 , 机器挂掉,oom等我们是捕获不到的

  1. func deadNotify(reg *resolver.Register) {
  2. ch := make(chan os.Signal, 1)
  3. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  4. go func() {
  5. log.Printf("signal.Notify %v", <-ch)
  6. reg.UnRegist()
  7. os.Exit(1)
  8. }()
  9. }

那我们可以用两个时间点一直做轮询,比如我设置key的有效期为10s,但过5s我们就去续约一次,这样就算节点挂掉,在etcd里最多也只有10s的生命周期,同理坏节点在grpc内置的connectPool里最多生存10s。
这里没必要写这么麻烦,用上面的KeepAlive是一样的,它会帮我们自动续期

  1. func (r *Register) Regist() {
  2. lgs, err := r.client3.Grant(context.TODO(), r.leaseTime)
  3. if nil != err {
  4. panic(err)
  5. }
  6. if _, err := r.client3.Put(context.TODO(), r.key, r.serverAddress, clientv3.WithLease(lgs.ID)); nil != err {
  7. panic(err)
  8. }
  9. go func() {
  10. c := time.Tick(1 * time.Second)
  11. for range c {
  12. if _, err := r.client3.KeepAliveOnce(context.TODO(), lgs.ID); nil != err {
  13. panic(err)
  14. }
  15. }
  16. }()
  17. }

grpc会什么不会将流量打到etcd里的坏节点?

要测试出效果,我们首先要增加每次租约时长,我们调整为2h
首先我们要知道,连接建立好后,会放在balance的subConnect下,grpc内置多个策略,它们之间会有差异,这里我们讨论的是roundrobin
image.png
image.png
image.png
由上图,我们看到了两个节点都处于 Ready 状态,根据设计规则,最后承担发送的数据的链接,都必须是通过Picker拿到的链接,我们可以看下roundrobin

  1. // Name is the name of round_robin balancer.
  2. const Name = "round_robin"
  3. var logger = grpclog.Component("roundrobin")
  4. // newBuilder creates a new roundrobin balancer builder.
  5. func newBuilder() balancer.Builder {
  6. return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
  7. }
  8. func init() {
  9. balancer.Register(newBuilder())
  10. }
  11. type rrPickerBuilder struct{}
  12. func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
  13. logger.Infof("roundrobinPicker: newPicker called with info: %v", info)
  14. if len(info.ReadySCs) == 0 {
  15. return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  16. }
  17. var scs []balancer.SubConn
  18. for sc := range info.ReadySCs {
  19. scs = append(scs, sc)
  20. }
  21. return &rrPicker{
  22. subConns: scs,
  23. // Start at a random index, as the same RR balancer rebuilds a new
  24. // picker when SubConn states change, and we don't want to apply excess
  25. // load to the first server in the list.
  26. next: grpcrand.Intn(len(scs)),
  27. }
  28. }
  29. type rrPicker struct {
  30. // subConns is the snapshot of the roundrobin balancer when this picker was
  31. // created. The slice is immutable. Each Get() will do a round robin
  32. // selection from it and return the selected SubConn.
  33. subConns []balancer.SubConn
  34. mu sync.Mutex
  35. next int
  36. }
  37. func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
  38. p.mu.Lock()
  39. sc := p.subConns[p.next]
  40. p.next = (p.next + 1) % len(p.subConns)
  41. p.mu.Unlock()
  42. return balancer.PickResult{SubConn: sc}, nil
  43. }

�整个代码不多,比较清晰,rrPickerBuilder 会 更新 rrPicker里的subConns,而 Pick从subConns轮询取一个值
而rrPickerBuilder 保证了 只有是有效连接才会放入 rrPicker

那么问题来了,肯定是需要有监听,才有感知到连接的状态哇,在新建balancerWrapper时,提供了watcher

  1. func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
  2. ccb := &ccBalancerWrapper{
  3. cc: cc,
  4. updateCh: buffer.NewUnbounded(),
  5. closed: grpcsync.NewEvent(),
  6. done: grpcsync.NewEvent(),
  7. subConns: make(map[*acBalancerWrapper]struct{}),
  8. }
  9. go ccb.watcher()
  10. ccb.balancer = b.Build(ccb, bopts)
  11. return ccb
  12. }
  13. func (ccb *ccBalancerWrapper) watcher() {
  14. for {
  15. select {
  16. case t := <-ccb.updateCh.Get():
  17. ccb.updateCh.Load()
  18. if ccb.closed.HasFired() {
  19. break
  20. }
  21. switch u := t.(type) {
  22. case *scStateUpdate:
  23. ccb.balancerMu.Lock()
  24. // 这里会去具体的Balancer,例如我们的例子是baseBalancer
  25. // 然后走一下逻辑,再去调用pickerBuilder.Build()
  26. ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
  27. ccb.balancerMu.Unlock()
  28. case *acBalancerWrapper:
  29. ccb.mu.Lock()
  30. if ccb.subConns != nil {
  31. delete(ccb.subConns, u)
  32. ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
  33. }
  34. ccb.mu.Unlock()
  35. default:
  36. logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
  37. }
  38. case <-ccb.closed.Done():
  39. }
  40. if ccb.closed.HasFired() {
  41. ...
  42. }
  43. }
  44. }

ps:处于Connecting的链接,会一直的尝试建立链接

加权轮训

参考 https://github.com/Bingjian-Zhu/etcd-example