etcd 有如下的使用场景:

  • 服务发现(Service Discovery)
  • 消息发布与订阅
  • 负载均衡
  • 分布式通知与协调
  • 分布式锁
  • 分布式队列
  • 集群监控于Leader竞选。

一、服务发现

image.png
etcd 的常见使用场景之一就是服务发现。实现思路如下:
先准备 etcd 服务端,服务端的程序在第一次启动之后会连接到 etcd 服务器并设置一个格式为 ip:port 的键值对,并绑定一个 lease。之后的服务端内部维护一个定时器,每隔一段时间就更新服务端注册中心的 lease 的 TTL。
另外一个组件就是服务发现组件,discovery 会 watch 服务端的 key。每次该 key 变化时,discovery 就可以检测到时间并做出对应的操作。
代码的实现如下:

  1. // server.go
  2. package main
  3. import (
  4. "context"
  5. "crypto/md5"
  6. "encoding/json"
  7. "errors"
  8. "flag"
  9. "fmt"
  10. "github.com/coreos/etcd/clientv3"
  11. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  12. "log"
  13. "net"
  14. "os"
  15. "os/signal"
  16. "strings"
  17. "syscall"
  18. "time"
  19. )
  20. var (
  21. prefix = "register"
  22. client *clientv3.Client
  23. stopSignal = make(chan struct{}, 1)
  24. srvKey string
  25. )
  26. var (
  27. serv = flag.String("name", "hello", "service name")
  28. port = flag.Int("port", 30000, "service port")
  29. endpoint = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints")
  30. )
  31. type SvConfig struct {
  32. Name string `json:"name"`
  33. Host string `json:"host"`
  34. Port int `json:"port"`
  35. }
  36. func Register(endpoints string, config *SvConfig, interval time.Duration, ttl int) error {
  37. // 解析服务端的值
  38. srvValue, _ := json.Marshal(config)
  39. srvKey = fmt.Sprintf("%s/%x", prefix, md5.Sum(srvValue))
  40. var err error
  41. client, err = clientv3.New(clientv3.Config{
  42. Endpoints: strings.Split(endpoints, ","),
  43. DialTimeout: time.Second * 2,
  44. })
  45. if err != nil {
  46. return fmt.Errorf("register service failed: %v", err)
  47. }
  48. go func() {
  49. timer := time.NewTicker(interval)
  50. for {
  51. resp, _ := client.Grant(context.TODO(), int64(ttl))
  52. _, err = client.Get(context.TODO(), srvKey)
  53. if err != nil {
  54. // 捕获 key 不存在的场合
  55. if errors.Is(err, rpctypes.ErrKeyNotFound) {
  56. _, err = client.Put(context.TODO(), srvKey, string(srvValue), clientv3.WithLease(resp.ID))
  57. if err != nil {
  58. log.Printf("register service %s at %s:%d\n", config.Name, config.Host, config.Port)
  59. }
  60. }
  61. } else {
  62. // 如果key存在就更新ttl
  63. _, err = client.Put(context.TODO(), srvKey, string(srvValue), clientv3.WithLease(resp.ID))
  64. }
  65. select {
  66. case <-stopSignal:
  67. return
  68. case <-timer.C:
  69. }
  70. }
  71. }()
  72. return err
  73. }
  74. func Unregister() error {
  75. stopSignal <- struct{}{}
  76. stopSignal = make(chan struct{}, 1)
  77. _, err := client.Delete(context.TODO(), srvKey)
  78. return err
  79. }
  80. func main() {
  81. flag.Parse()
  82. // 绑定服务地址和端口
  83. lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))
  84. if err != nil {
  85. panic(err)
  86. }
  87. config := &SvConfig{
  88. Name: *serv,
  89. Host: "127.0.0.1",
  90. Port: *port,
  91. }
  92. Register(*endpoint, config, time.Second*10, 15)
  93. ch := make(chan os.Signal, 1)
  94. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  95. go func() {
  96. <-ch
  97. Unregister()
  98. os.Exit(1)
  99. }()
  100. log.Printf("service %s start at %d", *serv, *port)
  101. // server todo
  102. for {
  103. lis.Accept()
  104. }
  105. }
  1. // discovery.go
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "github.com/coreos/etcd/clientv3"
  9. "log"
  10. "net"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "syscall"
  15. "time"
  16. )
  17. var (
  18. prefix = "register"
  19. client *clientv3.Client
  20. )
  21. var (
  22. port = flag.Int("port", 30001, "service port")
  23. endpoint = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints")
  24. )
  25. type SvConfig struct {
  26. Name string `json:"name"`
  27. Host string `json:"host"`
  28. Port int `json:"port"`
  29. }
  30. func watcher() error {
  31. var err error
  32. client, err = clientv3.New(clientv3.Config{
  33. Endpoints: strings.Split(*endpoint, ","),
  34. DialTimeout: time.Second * 3,
  35. })
  36. if err != nil {
  37. return fmt.Errorf("connect etcd cluster failed: %v", err.Error())
  38. }
  39. go func() {
  40. resp := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())
  41. for ch := range resp {
  42. for _, event := range ch.Events {
  43. switch event.Type {
  44. case clientv3.EventTypePut:
  45. if event.IsCreate() {
  46. srv := parseSrv(event.Kv.Value)
  47. log.Printf("discovery service %s at %s:%d", srv.Name, srv.Host, srv.Port)
  48. }
  49. case clientv3.EventTypeDelete:
  50. log.Printf("delete service %s", event.Kv.Key)
  51. }
  52. }
  53. }
  54. }()
  55. return err
  56. }
  57. func parseSrv(text []byte) *SvConfig {
  58. svc := &SvConfig{}
  59. json.Unmarshal(text, &svc)
  60. return svc
  61. }
  62. func main() {
  63. flag.Parse()
  64. // 绑定服务地址和端口
  65. lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))
  66. if err != nil {
  67. panic(err)
  68. }
  69. ch := make(chan os.Signal, 1)
  70. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  71. go func() {
  72. <-ch
  73. os.Exit(1)
  74. }()
  75. watcher()
  76. log.Printf("discovery start at %d", *port)
  77. // server todo
  78. for {
  79. lis.Accept()
  80. }
  81. }

二、消息发布与订阅

image.png
消息发布和订阅使用的场景也很多的。利用 etcd 的实现思路也很简单:只要消息的发布者向 etcd 发布一系列相同前缀的key,订阅者 watch 指定的前缀即可。
代码如下:

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/coreos/etcd/clientv3"
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. var (
  12. prefix = "/etcd"
  13. client *clientv3.Client
  14. endponts = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints")
  15. )
  16. func publisher(client *clientv3.Client) {
  17. go func() {
  18. timer := time.NewTicker(time.Second)
  19. for range timer.C {
  20. now := time.Now()
  21. key := fmt.Sprintf("%s/%d", prefix, now.Second())
  22. value := now.String()
  23. // 可以在这里添加 key 的 lease
  24. // resp, _ := client.Grant(context.TODO(), 10)
  25. // client.Put(context.TODO(), key, value, clientv3.WithLease(resp.ID))
  26. client.Put(context.TODO(), key, value)
  27. }
  28. }()
  29. }
  30. func subscriber(client *clientv3.Client) {
  31. watcher := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())
  32. for ch := range watcher {
  33. for _, e := range ch.Events {
  34. if e.IsCreate() {
  35. log.Printf("received %s => %s\n", e.Kv.Key, e.Kv.Value)
  36. }
  37. }
  38. }
  39. }
  40. func main() {
  41. flag.Parse()
  42. client, err := clientv3.New(clientv3.Config{
  43. Endpoints: strings.Split(*endponts, ","),
  44. DialTimeout: time.Second * 2,
  45. })
  46. if err != nil {
  47. log.Fatalln("connect etcd cluster failed: " + err.Error())
  48. }
  49. publisher(client)
  50. subscriber(client)
  51. select {
  52. //
  53. }
  54. }

三、负载均衡

image.png
etcd 可以配合 grpc 实现负载均衡的功能。可以在服务发现的基础上,利用 grpc 自带的 client 负载均衡实现。首先实现服务发现:

  1. // register.go
  2. package balance
  3. import (
  4. "fmt"
  5. "log"
  6. "strings"
  7. "time"
  8. "context"
  9. etcd3 "github.com/coreos/etcd/clientv3"
  10. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  11. )
  12. // 服务的前缀
  13. // 用这个来区分不同项目的服务
  14. var Prefix = "etcd3_naming"
  15. var client etcd3.Client
  16. var serviceKey string
  17. var stopSignal = make(chan bool, 1)
  18. // 服务注册
  19. func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
  20. serviceValue := fmt.Sprintf("%s:%d", host, port)
  21. serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)
  22. // 解析 etcd 的 endpoints
  23. // 开启 etcd 客户端用于注册服务
  24. var err error
  25. client, err := etcd3.New(etcd3.Config{
  26. Endpoints: strings.Split(target, ","),
  27. })
  28. if err != nil {
  29. return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
  30. }
  31. go func() {
  32. // 启动一个定时器自动注册服务
  33. ticker := time.NewTicker(interval)
  34. for {
  35. // 在 etcd 中创建一个 lease 绑定服务的地址
  36. resp, _ := client.Grant(context.TODO(), int64(ttl))
  37. // 检测服务地址是否存在,若不存在就创建
  38. _, err := client.Get(context.Background(), serviceKey)
  39. if err != nil {
  40. if err == rpctypes.ErrKeyNotFound {
  41. // 服务地址不存在
  42. if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
  43. log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
  44. }
  45. } else {
  46. log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
  47. }
  48. } else {
  49. // 刷新服务地址 lease
  50. if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
  51. log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
  52. }
  53. //log.Panicln(serviceKey)
  54. }
  55. select {
  56. case <-stopSignal:
  57. return
  58. case <-ticker.C:
  59. }
  60. }
  61. }()
  62. return nil
  63. }
  64. // 删除服务注册信息
  65. func UnRegister() error {
  66. stopSignal <- true
  67. // 获取 chan 之后马上留空,防止多个 UnRegister 造成死锁
  68. stopSignal = make(chan bool, 1)
  69. var err error
  70. if _, err := client.Delete(context.Background(), serviceKey); err != nil {
  71. log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
  72. } else {
  73. log.Printf("grpclb: deregister '%s' ok.", serviceKey)
  74. }
  75. return err
  76. }
  1. package balance
  2. import (
  3. "fmt"
  4. "context"
  5. etcd3 "github.com/coreos/etcd/clientv3"
  6. "github.com/coreos/etcd/mvcc/mvccpb"
  7. "google.golang.org/grpc/naming"
  8. )
  9. // watcher is the implementaion of grpc.naming.Watcher
  10. type watcher struct {
  11. re *resolver // re: Etcd Resolver
  12. client etcd3.Client
  13. isInitialized bool
  14. }
  15. // Close do nothing
  16. func (w *watcher) Close() {
  17. }
  18. // Next to return the updates
  19. func (w *watcher) Next() ([]*naming.Update, error) {
  20. // prefix is the etcd prefix/value to watch
  21. prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)
  22. fmt.Println("prefix", prefix)
  23. // check if is initialized
  24. if !w.isInitialized {
  25. // query addresses from etcd
  26. w.isInitialized = true
  27. resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
  28. if err == nil {
  29. addrs := extractAddrs(resp)
  30. //if not empty, return the updates or watcher new dir
  31. if l := len(addrs); l != 0 {
  32. updates := make([]*naming.Update, l)
  33. for i := range addrs {
  34. updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
  35. }
  36. return updates, nil
  37. }
  38. }
  39. }
  40. // generate etcd Watcher
  41. rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
  42. for wresp := range rch {
  43. for _, ev := range wresp.Events {
  44. switch ev.Type {
  45. case mvccpb.PUT:
  46. return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
  47. case mvccpb.DELETE:
  48. return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
  49. }
  50. }
  51. }
  52. return nil, nil
  53. }
  54. func extractAddrs(resp *etcd3.GetResponse) []string {
  55. addrs := []string{}
  56. if resp == nil || resp.Kvs == nil {
  57. return addrs
  58. }
  59. for i := range resp.Kvs {
  60. if v := resp.Kvs[i].Value; v != nil {
  61. addrs = append(addrs, string(v))
  62. }
  63. }
  64. return addrs
  65. }
  1. // resolver.go
  2. package balance
  3. import (
  4. "errors"
  5. "fmt"
  6. "strings"
  7. etcd3 "github.com/coreos/etcd/clientv3"
  8. "google.golang.org/grpc/naming"
  9. )
  10. // resolver is the implementaion of grpc.naming.Resolver
  11. type resolver struct {
  12. serviceName string // service name to resolve
  13. }
  14. // NewResolver return resolver with service name
  15. func NewResolver(serviceName string) *resolver {
  16. return &resolver{serviceName: serviceName}
  17. }
  18. // Resolve to resolve the service from etcd, target is the dial address of etcd
  19. // target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
  20. func (re *resolver) Resolve(target string) (naming.Watcher, error) {
  21. if re.serviceName == "" {
  22. return nil, errors.New("grpclb: no service name provided")
  23. }
  24. // generate etcd client
  25. client, err := etcd3.New(etcd3.Config{
  26. Endpoints: strings.Split(target, ","),
  27. })
  28. if err != nil {
  29. return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
  30. }
  31. // Return watcher
  32. return &watcher{re: re, client: *client}, nil
  33. }

实现服务发现和服务的解析之后,使用protobuf来定义服务的内容:

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. option java_package = "com.midea.jr.test.grpc";
  4. option java_outer_classname = "HelloWorldProto";
  5. option objc_class_prefix = "HLW";
  6. package pb;
  7. // The greeting service definition.
  8. service Greeter {
  9. // Sends a greeting
  10. rpc SayHello (HelloRequest) returns (HelloReply) {
  11. }
  12. }
  13. // The request message containing the user's name.
  14. message HelloRequest {
  15. string name = 1;
  16. }
  17. // The response message containing the greetings
  18. message HelloReply {
  19. string message = 1;
  20. }

将proto文件编译成go代码:

  1. # 需要先安装 protoc-gen-go 和 proto
  2. # go get -u github.com/golang/protobuf/protoc-gen-go
  3. # go get -u github.com/golang/protobuf/proto
  4. $ protoc -I ./pb --go_out=plugins=grpc:pb ./pb/helloworld.proto

服务端的代码如下:

  1. // server.go
  2. package main
  3. import (
  4. "flag"
  5. "fmt"
  6. "log"
  7. "net"
  8. "os"
  9. "os/signal"
  10. "syscall"
  11. "time"
  12. "context"
  13. "google.golang.org/grpc"
  14. grpclb "xingyys.com/mysite/balance"
  15. "xingyys.com/mysite/pb"
  16. )
  17. var (
  18. serv = flag.String("service", "hello_service", "service name")
  19. port = flag.Int("port", 50001, "listening port")
  20. reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
  21. )
  22. func main() {
  23. flag.Parse()
  24. // 绑定服务端的监听地址
  25. lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
  26. if err != nil {
  27. panic(err)
  28. }
  29. // 向 etcd 注册服务,指定检测周期为10s,生存周期为15s
  30. err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
  31. if err != nil {
  32. panic(err)
  33. }
  34. ch := make(chan os.Signal, 1)
  35. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  36. go func() {
  37. s := <-ch
  38. log.Printf("receive signal '%v'", s)
  39. grpclb.UnRegister()
  40. os.Exit(1)
  41. }()
  42. // 启动服务端
  43. log.Printf("starting hello service at %d", *port)
  44. s := grpc.NewServer()
  45. pb.RegisterGreeterServer(s, &server{})
  46. s.Serve(lis)
  47. }
  48. // server is used to implement helloworld.GreeterServer.
  49. type server struct{}
  50. // SayHello implements helloworld.GreeterServer
  51. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  52. fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
  53. return &pb.HelloReply{Message: "Hello " + in.Name}, nil
  54. }

负载均衡的代码下client中实现:

  1. // client.go
  2. package main
  3. import (
  4. "flag"
  5. "fmt"
  6. "time"
  7. "strconv"
  8. "context"
  9. "google.golang.org/grpc"
  10. grpclb "xingyys.com/mysite/balance"
  11. "xingyys.com/mysite/pb"
  12. )
  13. var (
  14. serv = flag.String("service", "hello_service", "service name")
  15. reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
  16. )
  17. func main() {
  18. flag.Parse()
  19. fmt.Println("serv", *serv)
  20. r := grpclb.NewResolver(*serv)
  21. b := grpc.RoundRobin(r)
  22. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  23. conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
  24. if err != nil {
  25. panic(err)
  26. }
  27. fmt.Println("conn...")
  28. ticker := time.NewTicker(1 * time.Second)
  29. for t := range ticker.C {
  30. client := pb.NewGreeterClient(conn)
  31. resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
  32. if err == nil {
  33. fmt.Printf("%v: Reply is %s\n", t, resp.Message)
  34. } else {
  35. fmt.Println(err)
  36. }
  37. }
  38. }

四、分布式通知与协调

image.png
和消息发布与订阅相似,都是用到 etcd 的 watch 机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现思路如下:不同的系统在 etcd 注册目录,并监控目录下 key 的变化,到检测到变化时,watcher 做出放映。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/coreos/etcd/clientv3"
  6. "log"
  7. "time"
  8. )
  9. func main() {
  10. client, err := clientv3.New(clientv3.Config{
  11. Endpoints: []string{"192.168.10.10:2379"},
  12. DialTimeout: time.Second * 3,
  13. })
  14. if err != nil {
  15. log.Fatalln(err)
  16. }
  17. prefix := "/job"
  18. id := "test_job"
  19. key := fmt.Sprintf("%s/%s", prefix, id)
  20. go func() {
  21. timer := time.NewTicker(time.Millisecond * 10)
  22. i := 0
  23. for range timer.C {
  24. if i > 100 {
  25. return
  26. }
  27. if _, err := client.Put(context.TODO(), key, string(i)); err == nil {
  28. log.Printf("job process: %d%%", i)
  29. }
  30. i++
  31. }
  32. }()
  33. watcher := client.Watch(context.TODO(), key)
  34. for ch := range watcher {
  35. for _, e := range ch.Events {
  36. if e.Kv.Value[0] == 100 {
  37. log.Println("job Done!")
  38. return
  39. }
  40. }
  41. }
  42. }

五、分布式锁

etcd的使用实例 - 图5
因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。实现的思路:多个 session 同时使用开启事物抢占同一 key,最先抢到的 session 获得锁,其他 session 等待锁的释放。如果是 trylock,session 在抢不到 session 时不再等待直接报错。
在 etcd clientv3的版本中,官方自带锁的实现,支持locks 和 trylock(需要 etcd v3.4.3)示例看 这里

六、分布式队列

image.png

etcd 分布式队列有两种实现方式,一种等待所有条件都满足后才开始执行任务。另一种是先入先出列队。第一种的思路就是在 watch 一个目录,当目录下存在必要的 key 时就进行对应操作。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/coreos/etcd/clientv3"
  6. "log"
  7. "math/rand"
  8. "time"
  9. )
  10. func random(max int) int {
  11. rand.Seed(time.Now().UnixNano())
  12. return rand.Intn(max)
  13. }
  14. func main() {
  15. client, _ := clientv3.New(clientv3.Config{
  16. Endpoints: []string{"192.168.10.10:2379"},
  17. DialTimeout: time.Second * 2,
  18. })
  19. prefix := "/queue"
  20. client.Delete(context.TODO(), prefix, clientv3.WithPrefix())
  21. // 每隔1s,condition 变为 0 1 2 中的随机一个
  22. go func() {
  23. timer := time.NewTicker(time.Second * 1)
  24. key := prefix + "/1"
  25. for range timer.C {
  26. rd := random(3)
  27. client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))
  28. }
  29. }()
  30. // 每隔2s,condition 变为 0 1 2 中的随机一个
  31. go func() {
  32. timer := time.NewTicker(time.Second * 1)
  33. key := prefix + "/2"
  34. for range timer.C {
  35. rd := random(3)
  36. client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))
  37. }
  38. }()
  39. // 每隔3s,condition 变为 0 1 2 中的随机一个
  40. go func() {
  41. timer := time.NewTicker(time.Second * 1)
  42. key := prefix + "/3"
  43. for range timer.C {
  44. rd := random(3)
  45. client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))
  46. }
  47. }()
  48. watcher := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())
  49. for range watcher {
  50. // 满足以下条件是退出
  51. // /queue/1 = 0
  52. // /queue/2 = 2
  53. // /queue/3 = 1
  54. resp, _ := client.Get(context.TODO(), prefix, clientv3.WithRange(prefix+"/4"))
  55. fmt.Println(resp.Kvs)
  56. if string(resp.Kvs[0].Value[0]) == "0" &&
  57. string(resp.Kvs[1].Value[0]) == "2" &&
  58. string(resp.Kvs[2].Value[0]) == "1" {
  59. log.Println("Done!")
  60. return
  61. }
  62. }
  63. }

第二种实现思路:

  1. package main
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "github.com/coreos/etcd/clientv3"
  7. "time"
  8. )
  9. func main() {
  10. ctx, cancel := context.WithCancel(context.Background())
  11. defer cancel()
  12. client, _ := clientv3.New(clientv3.Config{
  13. Endpoints: []string{"192.168.10.10:2379"},
  14. DialTimeout: time.Second * 2})
  15. prefix := "/queue"
  16. client.Delete(ctx, prefix, clientv3.WithPrefix())
  17. for i := 0; i < 10; i++ {
  18. key := fmt.Sprintf("%s/%x", prefix, md5.Sum([]byte(time.Now().String())))
  19. client.Put(ctx, key, string(i))
  20. fmt.Printf("key %s push queue\n", key)
  21. }
  22. fmt.Println("\n\n")
  23. ops := make([]clientv3.OpOption, 0)
  24. // 换成 clientv3.WithLastRev() 就是先进先出队列了
  25. ops = append(ops, clientv3.WithFirstRev()...)
  26. ops = append(ops, clientv3.WithPrefix())
  27. ops = append(ops, clientv3.WithLimit(1))
  28. for i := 0; i < 10; i++ {
  29. resp, _ := client.Get(context.TODO(), prefix, ops...)
  30. if resp.Count > 0 {
  31. key := string(resp.Kvs[0].Key)
  32. fmt.Printf("count %d => key %s pop queue\n", resp.Count, key)
  33. client.Delete(context.TODO(), key)
  34. }
  35. //fmt.Println(resp.Kvs)
  36. }
  37. }

七、集群监控与Leader竞选。

通过etcd来进行监控实现起来非常简单并且实时性强。

  1. Watcher机制,当某个节点消失或有变动时,Watcher会第一时间发现并告知用户。
  2. 节点可以设置TTL key,比如每隔30s发送一次心跳使代表该机器存活的节点继续存在,否则节点消失。

  这样就可以第一时间检测到各节点的健康状态,以完成集群的监控要求。

使用分布式锁,可以完成Leader竞选。
  这种场景通常是一些长时间CPU计算或者使用IO操作的机器,只需要竞选出的Leader计算或处理一次,就可以把结果复制给其他的Follower。从而避免重复劳动,节省计算资源。
这个的经典场景是搜索系统中建立全量索引。如果每个机器都进行一遍索引的建立,不但耗时而且建立索引的一致性不能保证。通过在etcd的CAS机制同时创建一个节点,创建成功的机器作为Leader,进行索引计算,然后把计算结果分发到其它节点。

etcd的使用实例 - 图7

同样官方自带示例:详细看 这里