一 、参考链接

  1. go操作etcd
  2. go.etcd.io/etcd/clientv3
  3. https://pkg.go.dev/go.etcd.io/etcd/client/v3#section-readme
  4. github.com/coreos/etcd
  5. https://etcd.io/docs/v3.3/demo/
  6. etcd:从应用场景到实现原理的全方位解读

    二 、 函数

    ```go func NewLocker(s *Session, pfx string) sync.Locker

func NewSTM(c v3.Client, apply func(STM) error, so …stmOption) (v3.TxnResponse, error)

func NewSTMReadCommitted(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)

func NewSTMRepeatable(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)

func NewSTMSerializable(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)

func WithAbortContext(ctx context.Context) stmOption

func WithIsolation(lvl Isolation) stmOption

func WithPrefetch(keys …string) stmOption

type Election func NewElection(s Session, pfx string) Election func ResumeElection(s Session, pfx string, leaderKey string, leaderRev int64) Election func (e Election) Campaign(ctx context.Context, val string) error func (e Election) Header() pb.ResponseHeader func (e Election) Key() string func (e Election) Leader(ctx context.Context) (v3.GetResponse, error) func (e Election) Observe(ctx context.Context) <-chan v3.GetResponse func (e Election) Proclaim(ctx context.Context, val string) error func (e Election) Resign(ctx context.Context) (err error) func (e Election) Rev() int64

type Isolation

type Mutex func NewMutex(s Session, pfx string) Mutex func (m Mutex) Header() pb.ResponseHeader func (m Mutex) IsOwner() v3.Cmp func (m Mutex) Key() string func (m Mutex) Lock(ctx context.Context) error func (m Mutex) Unlock(ctx context.Context) error

type STM

type Session func NewSession(client v3.Client, opts …SessionOption) (Session, error) func (s Session) Client() v3.Client func (s Session) Close() error func (s Session) Done() <-chan struct{} func (s Session) Lease() v3.LeaseID func (s Session) Orphan()

type SessionOption func WithContext(ctx context.Context) SessionOption func WithLease(leaseID v3.LeaseID) SessionOption func WithTTL(ttl int) SessionOption

  1. <a name="ddziF"></a>
  2. # 三 .示例
  3. ```go
  4. package main
  5. import (
  6. "context"
  7. "fmt"
  8. "time"
  9. "go.etcd.io/etcd/clientv3"
  10. )
  11. // etcd client put/get demo
  12. // use etcd/clientv3
  13. func main() {
  14. cli, err := clientv3.New(clientv3.Config{
  15. Endpoints: []string{"127.0.0.1:2379"},
  16. DialTimeout: 5 * time.Second,
  17. })
  18. if err != nil {
  19. // handle error!
  20. fmt.Printf("connect to etcd failed, err:%v\n", err)
  21. return
  22. }
  23. fmt.Println("connect to etcd success")
  24. defer cli.Close()
  25. // put
  26. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  27. _, err = cli.Put(ctx, "q1mi", "dsb")
  28. cancel()
  29. if err != nil {
  30. fmt.Printf("put to etcd failed, err:%v\n", err)
  31. return
  32. }
  33. // get
  34. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  35. resp, err := cli.Get(ctx, "q1mi")
  36. cancel()
  37. if err != nil {
  38. fmt.Printf("get from etcd failed, err:%v\n", err)
  39. return
  40. }
  41. for _, ev := range resp.Kvs {
  42. fmt.Printf("%s:%s\n", ev.Key, ev.Value)
  43. }
  44. }

官方示例

  1. //先连接,config配置里可以加很多东西,比如自己的证书
  2. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. defer cli.Close()
  7. // 搞两个会话,第一个
  8. // create two separate sessions for election competition
  9. s1, err := concurrency.NewSession(cli)
  10. if err != nil {
  11. log.Fatal(err)
  12. }
  13. defer s1.Close()
  14. e1 := concurrency.NewElection(s1, "/my-election/")
  15. // 搞两个会话,第二个
  16. s2, err := concurrency.NewSession(cli)
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer s2.Close()
  21. e2 := concurrency.NewElection(s2, "/my-election/")
  22. // create competing candidates, with e1 initially losing to e2
  23. var wg sync.WaitGroup
  24. wg.Add(2)
  25. electc := make(chan *concurrency.Election, 2)
  26. go func() {
  27. defer wg.Done()
  28. // delay candidacy so e2 wins first
  29. time.Sleep(3 * time.Second)
  30. if err := e1.Campaign(context.Background(), "e1"); err != nil {
  31. log.Fatal(err)
  32. }
  33. electc <- e1
  34. }()
  35. go func() {
  36. defer wg.Done()
  37. if err := e2.Campaign(context.Background(), "e2"); err != nil {
  38. log.Fatal(err)
  39. }
  40. electc <- e2
  41. }()
  42. cctx, cancel := context.WithCancel(context.TODO())
  43. defer cancel()
  44. e := <-electc
  45. fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
  46. // resign so next candidate can be elected
  47. if err := e.Resign(context.TODO()); err != nil {
  48. log.Fatal(err)
  49. }
  50. e = <-electc
  51. fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
  52. wg.Wait()

输出:

  1. Output:
  2. completed first election with e2
  3. completed second election with e1
  1. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer cli.Close()
  6. // create two separate sessions for lock competition
  7. s1, err := concurrency.NewSession(cli)
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. defer s1.Close()
  12. m1 := concurrency.NewMutex(s1, "/my-lock/")
  13. s2, err := concurrency.NewSession(cli)
  14. if err != nil {
  15. log.Fatal(err)
  16. }
  17. defer s2.Close()
  18. m2 := concurrency.NewMutex(s2, "/my-lock/")
  19. // acquire lock for s1
  20. if err := m1.Lock(context.TODO()); err != nil {
  21. log.Fatal(err)
  22. }
  23. fmt.Println("acquired lock for s1")
  24. m2Locked := make(chan struct{})
  25. go func() {
  26. defer close(m2Locked)
  27. // wait until s1 is locks /my-lock/
  28. if err := m2.Lock(context.TODO()); err != nil {
  29. log.Fatal(err)
  30. }
  31. }()
  32. if err := m1.Unlock(context.TODO()); err != nil {
  33. log.Fatal(err)
  34. }
  35. fmt.Println("released lock for s1")
  36. <-m2Locked
  37. fmt.Println("acquired lock for s2")
  38. Output:
  39. acquired lock for s1
  40. released lock for s1
  41. acquired lock for s2
  1. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer cli.Close()
  6. // set up "accounts"
  7. totalAccounts := 5
  8. for i := 0; i < totalAccounts; i++ {
  9. k := fmt.Sprintf("accts/%d", i)
  10. if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
  11. log.Fatal(err)
  12. }
  13. }
  14. exchange := func(stm concurrency.STM) error {
  15. from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
  16. if from == to {
  17. // nothing to do
  18. return nil
  19. }
  20. // read values
  21. fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
  22. fromV, toV := stm.Get(fromK), stm.Get(toK)
  23. fromInt, toInt := 0, 0
  24. fmt.Sscanf(fromV, "%d", &fromInt)
  25. fmt.Sscanf(toV, "%d", &toInt)
  26. // transfer amount
  27. xfer := fromInt / 2
  28. fromInt, toInt = fromInt-xfer, toInt+xfer
  29. // write back
  30. stm.Put(fromK, fmt.Sprintf("%d", fromInt))
  31. stm.Put(toK, fmt.Sprintf("%d", toInt))
  32. return nil
  33. }
  34. // concurrently exchange values between accounts
  35. var wg sync.WaitGroup
  36. wg.Add(10)
  37. for i := 0; i < 10; i++ {
  38. go func() {
  39. defer wg.Done()
  40. if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
  41. log.Fatal(serr)
  42. }
  43. }()
  44. }
  45. wg.Wait()
  46. // confirm account sum matches sum from beginning.
  47. sum := 0
  48. accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
  49. if err != nil {
  50. log.Fatal(err)
  51. }
  52. for _, kv := range accts.Kvs {
  53. v := 0
  54. fmt.Sscanf(string(kv.Value), "%d", &v)
  55. sum += v
  56. }
  57. fmt.Println("account sum is", sum)
  58. Output:
  59. account sum is 500

四、注意

  1. 计算保护状态:

分配集群:删除保护状态,按照灾备端 —— 重新计算 保护状态

收回集群:修改保护状态,修改为已删除
集群收回之后 如果不是未保护,状态更改为已删除
获取全部 — 直接全部删除
image.png