etcd 有如下的使用场景:
- 服务发现(Service Discovery)
- 消息发布与订阅
- 负载均衡
- 分布式通知与协调
- 分布式锁
- 分布式队列
- 集群监控于Leader竞选。
一、服务发现

etcd 的常见使用场景之一就是服务发现。实现思路如下:
先准备 etcd 服务端,服务端的程序在第一次启动之后会连接到 etcd 服务器并设置一个格式为 ip:port 的键值对,并绑定一个 lease。之后的服务端内部维护一个定时器,每隔一段时间就更新服务端注册中心的 lease 的 TTL。
另外一个组件就是服务发现组件,discovery 会 watch 服务端的 key。每次该 key 变化时,discovery 就可以检测到时间并做出对应的操作。
代码的实现如下:
// server.gopackage mainimport ("context""crypto/md5""encoding/json""errors""flag""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes""log""net""os""os/signal""strings""syscall""time")var (prefix = "register"client *clientv3.ClientstopSignal = make(chan struct{}, 1)srvKey string)var (serv = flag.String("name", "hello", "service name")port = flag.Int("port", 30000, "service port")endpoint = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints"))type SvConfig struct {Name string `json:"name"`Host string `json:"host"`Port int `json:"port"`}func Register(endpoints string, config *SvConfig, interval time.Duration, ttl int) error {// 解析服务端的值srvValue, _ := json.Marshal(config)srvKey = fmt.Sprintf("%s/%x", prefix, md5.Sum(srvValue))var err errorclient, err = clientv3.New(clientv3.Config{Endpoints: strings.Split(endpoints, ","),DialTimeout: time.Second * 2,})if err != nil {return fmt.Errorf("register service failed: %v", err)}go func() {timer := time.NewTicker(interval)for {resp, _ := client.Grant(context.TODO(), int64(ttl))_, err = client.Get(context.TODO(), srvKey)if err != nil {// 捕获 key 不存在的场合if errors.Is(err, rpctypes.ErrKeyNotFound) {_, err = client.Put(context.TODO(), srvKey, string(srvValue), clientv3.WithLease(resp.ID))if err != nil {log.Printf("register service %s at %s:%d\n", config.Name, config.Host, config.Port)}}} else {// 如果key存在就更新ttl_, err = client.Put(context.TODO(), srvKey, string(srvValue), clientv3.WithLease(resp.ID))}select {case <-stopSignal:returncase <-timer.C:}}}()return err}func Unregister() error {stopSignal <- struct{}{}stopSignal = make(chan struct{}, 1)_, err := client.Delete(context.TODO(), srvKey)return err}func main() {flag.Parse()// 绑定服务地址和端口lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))if err != nil {panic(err)}config := &SvConfig{Name: *serv,Host: "127.0.0.1",Port: *port,}Register(*endpoint, config, time.Second*10, 15)ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)go func() {<-chUnregister()os.Exit(1)}()log.Printf("service %s start at %d", *serv, *port)// server todofor {lis.Accept()}}
// discovery.gopackage mainimport ("context""encoding/json""flag""fmt""github.com/coreos/etcd/clientv3""log""net""os""os/signal""strings""syscall""time")var (prefix = "register"client *clientv3.Client)var (port = flag.Int("port", 30001, "service port")endpoint = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints"))type SvConfig struct {Name string `json:"name"`Host string `json:"host"`Port int `json:"port"`}func watcher() error {var err errorclient, err = clientv3.New(clientv3.Config{Endpoints: strings.Split(*endpoint, ","),DialTimeout: time.Second * 3,})if err != nil {return fmt.Errorf("connect etcd cluster failed: %v", err.Error())}go func() {resp := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())for ch := range resp {for _, event := range ch.Events {switch event.Type {case clientv3.EventTypePut:if event.IsCreate() {srv := parseSrv(event.Kv.Value)log.Printf("discovery service %s at %s:%d", srv.Name, srv.Host, srv.Port)}case clientv3.EventTypeDelete:log.Printf("delete service %s", event.Kv.Key)}}}}()return err}func parseSrv(text []byte) *SvConfig {svc := &SvConfig{}json.Unmarshal(text, &svc)return svc}func main() {flag.Parse()// 绑定服务地址和端口lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *port))if err != nil {panic(err)}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)go func() {<-chos.Exit(1)}()watcher()log.Printf("discovery start at %d", *port)// server todofor {lis.Accept()}}
二、消息发布与订阅

消息发布和订阅使用的场景也很多的。利用 etcd 的实现思路也很简单:只要消息的发布者向 etcd 发布一系列相同前缀的key,订阅者 watch 指定的前缀即可。
代码如下:
package mainimport ("context""flag""fmt""github.com/coreos/etcd/clientv3""log""strings""time")var (prefix = "/etcd"client *clientv3.Clientendponts = flag.String("endpoints", "http://127.0.0.1:2379", "etcd endpoints"))func publisher(client *clientv3.Client) {go func() {timer := time.NewTicker(time.Second)for range timer.C {now := time.Now()key := fmt.Sprintf("%s/%d", prefix, now.Second())value := now.String()// 可以在这里添加 key 的 lease// resp, _ := client.Grant(context.TODO(), 10)// client.Put(context.TODO(), key, value, clientv3.WithLease(resp.ID))client.Put(context.TODO(), key, value)}}()}func subscriber(client *clientv3.Client) {watcher := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())for ch := range watcher {for _, e := range ch.Events {if e.IsCreate() {log.Printf("received %s => %s\n", e.Kv.Key, e.Kv.Value)}}}}func main() {flag.Parse()client, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(*endponts, ","),DialTimeout: time.Second * 2,})if err != nil {log.Fatalln("connect etcd cluster failed: " + err.Error())}publisher(client)subscriber(client)select {//}}
三、负载均衡

etcd 可以配合 grpc 实现负载均衡的功能。可以在服务发现的基础上,利用 grpc 自带的 client 负载均衡实现。首先实现服务发现:
// register.gopackage balanceimport ("fmt""log""strings""time""context"etcd3 "github.com/coreos/etcd/clientv3""github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes")// 服务的前缀// 用这个来区分不同项目的服务var Prefix = "etcd3_naming"var client etcd3.Clientvar serviceKey stringvar stopSignal = make(chan bool, 1)// 服务注册func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {serviceValue := fmt.Sprintf("%s:%d", host, port)serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)// 解析 etcd 的 endpoints// 开启 etcd 客户端用于注册服务var err errorclient, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(target, ","),})if err != nil {return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)}go func() {// 启动一个定时器自动注册服务ticker := time.NewTicker(interval)for {// 在 etcd 中创建一个 lease 绑定服务的地址resp, _ := client.Grant(context.TODO(), int64(ttl))// 检测服务地址是否存在,若不存在就创建_, err := client.Get(context.Background(), serviceKey)if err != nil {if err == rpctypes.ErrKeyNotFound {// 服务地址不存在if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())}} else {log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())}} else {// 刷新服务地址 leaseif _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())}//log.Panicln(serviceKey)}select {case <-stopSignal:returncase <-ticker.C:}}}()return nil}// 删除服务注册信息func UnRegister() error {stopSignal <- true// 获取 chan 之后马上留空,防止多个 UnRegister 造成死锁stopSignal = make(chan bool, 1)var err errorif _, err := client.Delete(context.Background(), serviceKey); err != nil {log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())} else {log.Printf("grpclb: deregister '%s' ok.", serviceKey)}return err}
package balanceimport ("fmt""context"etcd3 "github.com/coreos/etcd/clientv3""github.com/coreos/etcd/mvcc/mvccpb""google.golang.org/grpc/naming")// watcher is the implementaion of grpc.naming.Watchertype watcher struct {re *resolver // re: Etcd Resolverclient etcd3.ClientisInitialized bool}// Close do nothingfunc (w *watcher) Close() {}// Next to return the updatesfunc (w *watcher) Next() ([]*naming.Update, error) {// prefix is the etcd prefix/value to watchprefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)fmt.Println("prefix", prefix)// check if is initializedif !w.isInitialized {// query addresses from etcdw.isInitialized = trueresp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())if err == nil {addrs := extractAddrs(resp)//if not empty, return the updates or watcher new dirif l := len(addrs); l != 0 {updates := make([]*naming.Update, l)for i := range addrs {updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}}return updates, nil}}}// generate etcd Watcherrch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())for wresp := range rch {for _, ev := range wresp.Events {switch ev.Type {case mvccpb.PUT:return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nilcase mvccpb.DELETE:return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil}}}return nil, nil}func extractAddrs(resp *etcd3.GetResponse) []string {addrs := []string{}if resp == nil || resp.Kvs == nil {return addrs}for i := range resp.Kvs {if v := resp.Kvs[i].Value; v != nil {addrs = append(addrs, string(v))}}return addrs}
// resolver.gopackage balanceimport ("errors""fmt""strings"etcd3 "github.com/coreos/etcd/clientv3""google.golang.org/grpc/naming")// resolver is the implementaion of grpc.naming.Resolvertype resolver struct {serviceName string // service name to resolve}// NewResolver return resolver with service namefunc NewResolver(serviceName string) *resolver {return &resolver{serviceName: serviceName}}// Resolve to resolve the service from etcd, target is the dial address of etcd// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"func (re *resolver) Resolve(target string) (naming.Watcher, error) {if re.serviceName == "" {return nil, errors.New("grpclb: no service name provided")}// generate etcd clientclient, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(target, ","),})if err != nil {return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())}// Return watcherreturn &watcher{re: re, client: *client}, nil}
实现服务发现和服务的解析之后,使用protobuf来定义服务的内容:
syntax = "proto3";option java_multiple_files = true;option java_package = "com.midea.jr.test.grpc";option java_outer_classname = "HelloWorldProto";option objc_class_prefix = "HLW";package pb;// The greeting service definition.service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;}
将proto文件编译成go代码:
# 需要先安装 protoc-gen-go 和 proto# go get -u github.com/golang/protobuf/protoc-gen-go# go get -u github.com/golang/protobuf/proto$ protoc -I ./pb --go_out=plugins=grpc:pb ./pb/helloworld.proto
服务端的代码如下:
// server.gopackage mainimport ("flag""fmt""log""net""os""os/signal""syscall""time""context""google.golang.org/grpc"grpclb "xingyys.com/mysite/balance""xingyys.com/mysite/pb")var (serv = flag.String("service", "hello_service", "service name")port = flag.Int("port", 50001, "listening port")reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address"))func main() {flag.Parse()// 绑定服务端的监听地址lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))if err != nil {panic(err)}// 向 etcd 注册服务,指定检测周期为10s,生存周期为15serr = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)if err != nil {panic(err)}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)go func() {s := <-chlog.Printf("receive signal '%v'", s)grpclb.UnRegister()os.Exit(1)}()// 启动服务端log.Printf("starting hello service at %d", *port)s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})s.Serve(lis)}// server is used to implement helloworld.GreeterServer.type server struct{}// SayHello implements helloworld.GreeterServerfunc (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)return &pb.HelloReply{Message: "Hello " + in.Name}, nil}
负载均衡的代码下client中实现:
// client.gopackage mainimport ("flag""fmt""time""strconv""context""google.golang.org/grpc"grpclb "xingyys.com/mysite/balance""xingyys.com/mysite/pb")var (serv = flag.String("service", "hello_service", "service name")reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address"))func main() {flag.Parse()fmt.Println("serv", *serv)r := grpclb.NewResolver(*serv)b := grpc.RoundRobin(r)ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))if err != nil {panic(err)}fmt.Println("conn...")ticker := time.NewTicker(1 * time.Second)for t := range ticker.C {client := pb.NewGreeterClient(conn)resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})if err == nil {fmt.Printf("%v: Reply is %s\n", t, resp.Message)} else {fmt.Println(err)}}}
四、分布式通知与协调

和消息发布与订阅相似,都是用到 etcd 的 watch 机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现思路如下:不同的系统在 etcd 注册目录,并监控目录下 key 的变化,到检测到变化时,watcher 做出放映。
package mainimport ("context""fmt""github.com/coreos/etcd/clientv3""log""time")func main() {client, err := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.10.10:2379"},DialTimeout: time.Second * 3,})if err != nil {log.Fatalln(err)}prefix := "/job"id := "test_job"key := fmt.Sprintf("%s/%s", prefix, id)go func() {timer := time.NewTicker(time.Millisecond * 10)i := 0for range timer.C {if i > 100 {return}if _, err := client.Put(context.TODO(), key, string(i)); err == nil {log.Printf("job process: %d%%", i)}i++}}()watcher := client.Watch(context.TODO(), key)for ch := range watcher {for _, e := range ch.Events {if e.Kv.Value[0] == 100 {log.Println("job Done!")return}}}}
五、分布式锁

因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。实现的思路:多个 session 同时使用开启事物抢占同一 key,最先抢到的 session 获得锁,其他 session 等待锁的释放。如果是 trylock,session 在抢不到 session 时不再等待直接报错。
在 etcd clientv3的版本中,官方自带锁的实现,支持locks 和 trylock(需要 etcd v3.4.3)示例看 这里
六、分布式队列

etcd 分布式队列有两种实现方式,一种等待所有条件都满足后才开始执行任务。另一种是先入先出列队。第一种的思路就是在 watch 一个目录,当目录下存在必要的 key 时就进行对应操作。
package mainimport ("context""fmt""github.com/coreos/etcd/clientv3""log""math/rand""time")func random(max int) int {rand.Seed(time.Now().UnixNano())return rand.Intn(max)}func main() {client, _ := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.10.10:2379"},DialTimeout: time.Second * 2,})prefix := "/queue"client.Delete(context.TODO(), prefix, clientv3.WithPrefix())// 每隔1s,condition 变为 0 1 2 中的随机一个go func() {timer := time.NewTicker(time.Second * 1)key := prefix + "/1"for range timer.C {rd := random(3)client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))}}()// 每隔2s,condition 变为 0 1 2 中的随机一个go func() {timer := time.NewTicker(time.Second * 1)key := prefix + "/2"for range timer.C {rd := random(3)client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))}}()// 每隔3s,condition 变为 0 1 2 中的随机一个go func() {timer := time.NewTicker(time.Second * 1)key := prefix + "/3"for range timer.C {rd := random(3)client.Put(context.TODO(), key, fmt.Sprintf("%d", rd))}}()watcher := client.Watch(context.TODO(), prefix, clientv3.WithPrefix())for range watcher {// 满足以下条件是退出// /queue/1 = 0// /queue/2 = 2// /queue/3 = 1resp, _ := client.Get(context.TODO(), prefix, clientv3.WithRange(prefix+"/4"))fmt.Println(resp.Kvs)if string(resp.Kvs[0].Value[0]) == "0" &&string(resp.Kvs[1].Value[0]) == "2" &&string(resp.Kvs[2].Value[0]) == "1" {log.Println("Done!")return}}}
第二种实现思路:
package mainimport ("context""crypto/md5""fmt""github.com/coreos/etcd/clientv3""time")func main() {ctx, cancel := context.WithCancel(context.Background())defer cancel()client, _ := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.10.10:2379"},DialTimeout: time.Second * 2})prefix := "/queue"client.Delete(ctx, prefix, clientv3.WithPrefix())for i := 0; i < 10; i++ {key := fmt.Sprintf("%s/%x", prefix, md5.Sum([]byte(time.Now().String())))client.Put(ctx, key, string(i))fmt.Printf("key %s push queue\n", key)}fmt.Println("\n\n")ops := make([]clientv3.OpOption, 0)// 换成 clientv3.WithLastRev() 就是先进先出队列了ops = append(ops, clientv3.WithFirstRev()...)ops = append(ops, clientv3.WithPrefix())ops = append(ops, clientv3.WithLimit(1))for i := 0; i < 10; i++ {resp, _ := client.Get(context.TODO(), prefix, ops...)if resp.Count > 0 {key := string(resp.Kvs[0].Key)fmt.Printf("count %d => key %s pop queue\n", resp.Count, key)client.Delete(context.TODO(), key)}//fmt.Println(resp.Kvs)}}
七、集群监控与Leader竞选。
通过etcd来进行监控实现起来非常简单并且实时性强。
- Watcher机制,当某个节点消失或有变动时,Watcher会第一时间发现并告知用户。
- 节点可以设置
TTL key,比如每隔30s发送一次心跳使代表该机器存活的节点继续存在,否则节点消失。
这样就可以第一时间检测到各节点的健康状态,以完成集群的监控要求。
使用分布式锁,可以完成Leader竞选。
这种场景通常是一些长时间CPU计算或者使用IO操作的机器,只需要竞选出的Leader计算或处理一次,就可以把结果复制给其他的Follower。从而避免重复劳动,节省计算资源。
这个的经典场景是搜索系统中建立全量索引。如果每个机器都进行一遍索引的建立,不但耗时而且建立索引的一致性不能保证。通过在etcd的CAS机制同时创建一个节点,创建成功的机器作为Leader,进行索引计算,然后把计算结果分发到其它节点。

同样官方自带示例:详细看 这里
