关于 etcd 的安装和介绍看 这里
官方的实例可以看 这里

一、连接

首先是关于 golang 如何连接 etcd ,先是简单的连接。

  1. package main
  2. import (
  3. "github.com/coreos/etcd/clientv3"
  4. "log"
  5. "time"
  6. )
  7. func connect() {
  8. cli, err := clientv3.New(clientv3.Config{
  9. // etcd 集群的地址集合
  10. Endpoints: []string{"192.168.10.10:2379"},
  11. // 请求超时时间
  12. DialTimeout: time.Second * 3,
  13. })
  14. if err != nil {
  15. log.Fatal("connect etcd cluster: " + err.Error())
  16. }
  17. cli.Close()
  18. }

还有带 https 和 开启用户验证的连接

  1. func connectTlsAuth() {
  2. tlsInfo := transport.TLSInfo{
  3. CertFile: "/tmp/cert.pem",
  4. KeyFile: "/tmp/key.pem",
  5. TrustedCAFile: "/tmp/ca.pem",
  6. }
  7. tlsConfig, err := tlsInfo.ClientConfig()
  8. if err != nil {
  9. log.Fatal("parse tls config file: " + err.Error())
  10. }
  11. cli, err := clientv3.New(clientv3.Config{
  12. Endpoints: []string{"192.168.10.10:2379"},
  13. DialTimeout: time.Second * 3,
  14. TLS: tlsConfig,
  15. Username: "root",
  16. Password: "root",
  17. })
  18. if err != nil {
  19. log.Fatal("connect etcd cluster: " + err.Error())
  20. }
  21. cli.Close()
  22. }

二、KV 操作

2.1 简单的 curd

在连接基础上,接下来就可以对key做操作了。对key做 curd

  1. func kv() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. // etcdctl put foo 1
  12. _, err := cli.Put(ctx, "foo", "1")
  13. if err != nil {
  14. log.Fatal("put key:" + err.Error())
  15. }
  16. // etcdctl get foo --prefix
  17. // 带参数的请求
  18. resp, err := cli.Get(ctx, "foo", clientv3.WithPrefix())
  19. if err != nil {
  20. log.Fatal("get key: " + err.Error())
  21. }
  22. for _, v := range resp.Kvs {
  23. log.Printf("get %s => %s\n", v.Key, string(v.Value))
  24. }
  25. kvcli := clientv3.NewKV(cli)
  26. // etcdctl del foo
  27. _, err = kvcli.Delete(ctx, "foo")
  28. if err != nil {
  29. log.Fatal("delete key: " + err.Error())
  30. }
  31. }

2.2 事务

使用事务如下:

  1. func txn() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. kvc := clientv3.NewKV(cli)
  12. _, err := kvc.Put(ctx, "foo", "xyz")
  13. if err != nil {
  14. log.Fatal("put key: " + err.Error())
  15. }
  16. _, err = kvc.Txn(ctx).
  17. // txn value comparisons are lexical
  18. If(clientv3.Compare(clientv3.Value("foo"), ">", "abc")).
  19. // the "Then" runs, since "xyz" > "abc"
  20. Then(clientv3.OpPut("foo", "XYZ")).
  21. // the "Else" does not run
  22. Else(clientv3.OpPut("foo", "ABC")).
  23. Commit()
  24. if err != nil {
  25. log.Fatal("run txn: " + err.Error())
  26. }
  27. }

2.3 批量操作

批量指定操作

  1. func do() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. ops := []clientv3.Op{
  12. clientv3.OpPut("key1", "123"),
  13. clientv3.OpGet("key1"),
  14. clientv3.OpPut("key2", "456"),
  15. }
  16. for _, op := range ops {
  17. if _, err := cli.Do(ctx, op); err != nil {
  18. log.Fatal(err.Error())
  19. }
  20. }
  21. }

2.3 watch

监视key

  1. func watch() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. go func() {
  12. timer := time.NewTicker(time.Second)
  13. for {
  14. select {
  15. case <-timer.C:
  16. // change foo value every second
  17. _, _ = cli.Put(context.TODO(), "foo", time.Now().String())
  18. _, _ = cli.Put(context.TODO(), "foo1", time.Now().String())
  19. _, _ = cli.Put(context.TODO(), "foo2", time.Now().String())
  20. _, _ = cli.Put(context.TODO(), "foo3", time.Now().String())
  21. _, _ = cli.Put(context.TODO(), "foo4", time.Now().String())
  22. }
  23. }
  24. }()
  25. //rch := cli.Watch(ctx, "foo")
  26. rch := cli.Watch(ctx, "foo", clientv3.WithPrefix())
  27. //rch := cli.Watch(ctx, "foo", clientv3.WithRange("foo4"))
  28. for wresp := range rch {
  29. for _, ev := range wresp.Events {
  30. fmt.Printf("%s %q: %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
  31. }
  32. }
  33. }
  1. func watchWithProcessNotify() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. rch := cli.Watch(ctx, "foo", clientv3.WithProgressNotify())
  12. wresp := <- rch
  13. fmt.Printf("wresp.Header.Revision: %d\n", wresp.Header.Revision)
  14. fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())
  15. }

三、lease

2.1 创建 lease

  1. func grant() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. // etcdctl lease grant 5
  12. // grant lease 5s
  13. resp, err := cli.Grant(ctx, 5)
  14. if err != nil {
  15. log.Fatal("grant lease: " + err.Error())
  16. }
  17. // after 5 seconds, the key 'foo' will be removed
  18. _, err = cli.Put(ctx, "foo", "bar", clientv3.WithLease(resp.ID))
  19. if err != nil {
  20. log.Fatal("put key with lease: " + err.Error())
  21. }
  22. }

2.2 删除 lease

  1. func revoke() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. resp, err := cli.Grant(ctx, 5)
  12. if err != nil {
  13. log.Fatal("grant lease: " + err.Error())
  14. }
  15. _, err = cli.Put(ctx, "foo", "bar", clientv3.WithLease(resp.ID))
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. // revoking lease expires the key attached to its lease ID
  20. _, err = cli.Revoke(ctx, resp.ID)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. }

2.3 续租

  1. func keepAlive() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. resp, err := cli.Grant(ctx, 5)
  12. if err != nil {
  13. log.Fatal("grant lease: " + err.Error())
  14. }
  15. _, err = cli.Put(ctx, "foo", "bar", clientv3.WithLease(resp.ID))
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. ch, err := cli.KeepAlive(ctx, resp.ID)
  20. if err != nil {
  21. log.Fatal(err.Error())
  22. }
  23. ka := <- ch
  24. fmt.Println("ttl:", ka.TTL)
  25. // 官方提示:多数情况下使用 KeepAlive 来代替 KeepAliveOnce
  26. kaa, err := cli.KeepAliveOnce(ctx, resp.ID)
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30. fmt.Println("ttl:", kaa.TTL)
  31. }

2.4 查询 lease

  1. func leases() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. _, err := cli.Grant(ctx, 5)
  12. if err != nil {
  13. log.Fatal("grant lease: " + err.Error())
  14. }
  15. _, err = cli.Grant(ctx, 10)
  16. if err != nil {
  17. log.Fatal("grant lease: " + err.Error())
  18. }
  19. _, err = cli.Grant(ctx, 15)
  20. if err != nil {
  21. log.Fatal("grant lease: " + err.Error())
  22. }
  23. resp, err := cli.Lease.Leases(ctx)
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. for _, lease := range resp.Leases {
  28. ttl, err := cli.Lease.TimeToLive(ctx, lease.ID, clientv3.WithAttachedKeys())
  29. if err == nil {
  30. fmt.Printf("lease: %d, ttl: %d, grantedTTL: %d\n", ttl.ID, ttl.TTL, ttl.GrantedTTL)
  31. }
  32. }
  33. }

四、访问控制

  1. func auth() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. auth := clientv3.NewAuth(cli)
  12. // create role
  13. if _, err := auth.RoleAdd(ctx, "root"); err != nil {
  14. log.Fatal(err)
  15. }
  16. // create role
  17. if _, err := auth.UserAdd(ctx, "root", "123"); err != nil {
  18. log.Fatal(err)
  19. }
  20. // grant role root to user root
  21. if _, err := auth.UserGrantRole(ctx, "root", "root"); err != nil {
  22. log.Fatal(err)
  23. }
  24. if _, err := auth.UserChangePassword(ctx, "root", "123"); err != nil {
  25. log.Fatal(err)
  26. }
  27. if _, err := auth.RoleAdd(ctx, "guest"); err != nil {
  28. log.Fatal(err)
  29. }
  30. if _, err := auth.UserAdd(ctx, "xingyys", ""); err != nil {
  31. log.Fatal(err)
  32. }
  33. if _, err := auth.UserGrantRole(ctx, "xingyys", "guest"); err != nil {
  34. log.Fatal(err)
  35. }
  36. // 不知道为什么,需要在grant后更新密码
  37. // 否则密码无效
  38. if _, err := auth.UserChangePassword(ctx, "xingyys", "123"); err != nil {
  39. log.Fatal(err)
  40. }
  41. // 添加指定key的访问权限
  42. // read, write, readwrite
  43. if _, err := auth.RoleGrantPermission(ctx,
  44. "guest",
  45. "foo",
  46. "zoo",
  47. clientv3.PermissionType(clientv3.PermReadWrite)); err != nil {
  48. log.Fatal(err)
  49. }
  50. if _, err := auth.AuthEnable(ctx); err != nil {
  51. log.Fatal(err)
  52. }
  53. authCli, _ := clientv3.New(clientv3.Config{
  54. // etcd 集群的地址集合
  55. Endpoints: []string{"192.168.10.10:2379"},
  56. // 请求超时时间
  57. DialTimeout: time.Second * 3,
  58. Username: "xingyys",
  59. Password: "123",
  60. })
  61. defer authCli.Close()
  62. _, _ = authCli.Put(ctx, "foo", "1")
  63. resp, _ := authCli.Get(ctx, "foo")
  64. for _, v := range resp.Kvs {
  65. log.Printf("%s => %q\n", v.Key, v.Value)
  66. }
  67. _, err := authCli.Txn(ctx).
  68. If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
  69. Then(clientv3.OpPut("zoo1", "XYZ")).
  70. Else(clientv3.OpPut("zoo1", "ABC")).
  71. Commit()
  72. log.Println(err)
  73. }

五、集群

  1. func member() {
  2. cli, _ := clientv3.New(clientv3.Config{
  3. // etcd 集群的地址集合
  4. Endpoints: []string{"192.168.10.10:2379"},
  5. // 请求超时时间
  6. DialTimeout: time.Second * 3,
  7. })
  8. defer cli.Close()
  9. ctx, cancel := context.WithCancel(context.Background())
  10. defer cancel()
  11. cluster := clientv3.NewCluster(cli)
  12. resp, err := cluster.MemberList(ctx)
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. for _, member := range resp.Members {
  17. fmt.Printf("ID: %d | Name: %s | ClientURL: %q | PeerURL: %q\n",
  18. member.ID,
  19. member.Name,
  20. member.ClientURLs,
  21. member.PeerURLs)
  22. }
  23. //_, _ = cluster.MemberAdd(ctx, []string{"192.168.10.10:2370", "192.168.10.11:2379"})
  24. //_, _ = cluster.MemberRemove(ctx, // id)
  25. //_, _ = cluster.MemberUpdate(ctx, // id, // peer)
  26. }

六、并发

6.1 锁

  1. func lock() {
  2. cli, err := clientv3.New(clientv3.Config{
  3. Endpoints: []string{"192.168.10.10:2379"},
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer cli.Close()
  9. // 注册session
  10. s1, err := concurrency.NewSession(cli)
  11. if err != nil {
  12. log.Fatal(err)
  13. }
  14. defer s1.Close()
  15. m1 := concurrency.NewMutex(s1, "/lock")
  16. s2, err := concurrency.NewSession(cli)
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer s2.Close()
  21. m2 := concurrency.NewMutex(s2, "/lock")
  22. // acquired lock for s1
  23. if err := m1.Lock(context.TODO()); err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Println("acquired lock for s1")
  27. m2Locked := make(chan struct{})
  28. go func() {
  29. defer close(m2Locked)
  30. // wait util s1 is locks /lock
  31. if err := m2.Lock(context.TODO()); err != nil {
  32. log.Fatal(err)
  33. }
  34. }()
  35. if err := m1.Unlock(context.TODO()); err != nil {
  36. log.Fatal(err)
  37. }
  38. fmt.Println("release lock for s1")
  39. <-m2Locked
  40. fmt.Println("acquired lock for s2")
  41. }
  42. func tryLock() {
  43. cli, err := clientv3.New(clientv3.Config{
  44. Endpoints: []string{"192.168.10.10:2379"},
  45. })
  46. if err != nil {
  47. log.Fatal(err)
  48. }
  49. defer cli.Close()
  50. // 注册session
  51. s1, err := concurrency.NewSession(cli)
  52. if err != nil {
  53. log.Fatal(err)
  54. }
  55. defer s1.Close()
  56. m1 := concurrency.NewMutex(s1, "/lock")
  57. s2, err := concurrency.NewSession(cli)
  58. if err != nil {
  59. log.Fatal(err)
  60. }
  61. defer s2.Close()
  62. m2 := concurrency.NewMutex(s2, "/lock")
  63. // acquire lock for s1
  64. if err = m1.Lock(context.TODO()); err != nil {
  65. log.Fatal(err)
  66. }
  67. fmt.Println("acquired lock for s1")
  68. if err = m2.TryLock(context.TODO()); err == nil {
  69. log.Fatal("should not acquire lock")
  70. }
  71. if err == concurrency.ErrLocked {
  72. fmt.Println("cannot acquire lock for s2, as already locked in another session")
  73. }
  74. if err = m1.Unlock(context.TODO()); err != nil {
  75. log.Fatal(err)
  76. }
  77. fmt.Println("released lock for s1")
  78. if err = m2.TryLock(context.TODO()); err != nil {
  79. log.Fatal(err)
  80. }
  81. fmt.Println("acquired lock for s2")
  82. }

6.2 领导选举

  1. func election() {
  2. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.10.10:2379"}})
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. defer cli.Close()
  7. // create two separate sessions for election competition
  8. s1, err := concurrency.NewSession(cli)
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer s1.Close()
  13. e1 := concurrency.NewElection(s1, "/my-election/")
  14. s2, err := concurrency.NewSession(cli)
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. defer s2.Close()
  19. e2 := concurrency.NewElection(s2, "/my-election/")
  20. // create competing candidates, with e1 initially losing to e2
  21. var wg sync.WaitGroup
  22. wg.Add(2)
  23. electc := make(chan *concurrency.Election, 2)
  24. go func() {
  25. defer wg.Done()
  26. // delay candidacy so e2 wins first
  27. time.Sleep(3 * time.Second)
  28. if err := e1.Campaign(context.Background(), "e1"); err != nil {
  29. log.Fatal(err)
  30. }
  31. electc <- e1
  32. }()
  33. go func() {
  34. defer wg.Done()
  35. if err := e2.Campaign(context.Background(), "e2"); err != nil {
  36. log.Fatal(err)
  37. }
  38. electc <- e2
  39. }()
  40. cctx, cancel := context.WithCancel(context.TODO())
  41. defer cancel()
  42. e := <-electc
  43. fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
  44. // resign so next candidate can be elected
  45. if err := e.Resign(context.TODO()); err != nil {
  46. log.Fatal(err)
  47. }
  48. e = <-electc
  49. fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
  50. wg.Wait()
  51. }

6.3 软件事务内存

  1. func stm() {
  2. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"192.168.10.10:2379"}})
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. defer cli.Close()
  7. // set up "accounts"
  8. totalAccounts := 5
  9. for i := 0; i < totalAccounts; i++ {
  10. k := fmt.Sprintf("accts/%d", i)
  11. if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
  12. log.Fatal(err)
  13. }
  14. }
  15. exchange := func(stm concurrency.STM) error {
  16. from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
  17. if from == to {
  18. // nothing to do
  19. return nil
  20. }
  21. // read values
  22. fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
  23. fromV, toV := stm.Get(fromK), stm.Get(toK)
  24. fromInt, toInt := 0, 0
  25. fmt.Sscanf(fromV, "%d", &fromInt)
  26. fmt.Sscanf(toV, "%d", &toInt)
  27. // transfer amount
  28. xfer := fromInt / 2
  29. fromInt, toInt = fromInt-xfer, toInt+xfer
  30. // write back
  31. stm.Put(fromK, fmt.Sprintf("%d", fromInt))
  32. stm.Put(toK, fmt.Sprintf("%d", toInt))
  33. return nil
  34. }
  35. // concurrently exchange values between accounts
  36. var wg sync.WaitGroup
  37. wg.Add(10)
  38. for i := 0; i < 10; i++ {
  39. go func() {
  40. defer wg.Done()
  41. if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
  42. log.Fatal(serr)
  43. }
  44. }()
  45. }
  46. wg.Wait()
  47. // confirm account sum matches sum from beginning.
  48. sum := 0
  49. accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. for _, kv := range accts.Kvs {
  54. v := 0
  55. fmt.Sscanf(string(kv.Value), "%d", &v)
  56. sum += v
  57. }
  58. fmt.Println("account sum is", sum)
  59. }