一 、参考链接
- go操作etcd
- go.etcd.io/etcd/clientv3
- https://pkg.go.dev/go.etcd.io/etcd/client/v3#section-readme
- github.com/coreos/etcd
- https://etcd.io/docs/v3.3/demo/
- etcd:从应用场景到实现原理的全方位解读
二 、 函数
```go func NewLocker(s *Session, pfx string) sync.Locker
func NewSTM(c v3.Client, apply func(STM) error, so …stmOption) (v3.TxnResponse, error)
func NewSTMReadCommitted(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)
func NewSTMRepeatable(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)
func NewSTMSerializable(ctx context.Context, c v3.Client, apply func(STM) error) (v3.TxnResponse, error)
func WithAbortContext(ctx context.Context) stmOption
func WithIsolation(lvl Isolation) stmOption
func WithPrefetch(keys …string) stmOption
type Election func NewElection(s Session, pfx string) Election func ResumeElection(s Session, pfx string, leaderKey string, leaderRev int64) Election func (e Election) Campaign(ctx context.Context, val string) error func (e Election) Header() pb.ResponseHeader func (e Election) Key() string func (e Election) Leader(ctx context.Context) (v3.GetResponse, error) func (e Election) Observe(ctx context.Context) <-chan v3.GetResponse func (e Election) Proclaim(ctx context.Context, val string) error func (e Election) Resign(ctx context.Context) (err error) func (e Election) Rev() int64
type Isolation
type Mutex func NewMutex(s Session, pfx string) Mutex func (m Mutex) Header() pb.ResponseHeader func (m Mutex) IsOwner() v3.Cmp func (m Mutex) Key() string func (m Mutex) Lock(ctx context.Context) error func (m Mutex) Unlock(ctx context.Context) error
type STM
type Session func NewSession(client v3.Client, opts …SessionOption) (Session, error) func (s Session) Client() v3.Client func (s Session) Close() error func (s Session) Done() <-chan struct{} func (s Session) Lease() v3.LeaseID func (s Session) Orphan()
type SessionOption func WithContext(ctx context.Context) SessionOption func WithLease(leaseID v3.LeaseID) SessionOption func WithTTL(ttl int) SessionOption
<a name="ddziF"></a>
# 三 .示例
```go
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// etcd client put/get demo
// use etcd/clientv3
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
官方示例
//先连接,config配置里可以加很多东西,比如自己的证书
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 搞两个会话,第一个
// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/my-election/")
// 搞两个会话,第二个
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/my-election/")
// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
defer wg.Done()
// delay candidacy so e2 wins first
time.Sleep(3 * time.Second)
if err := e1.Campaign(context.Background(), "e1"); err != nil {
log.Fatal(err)
}
electc <- e1
}()
go func() {
defer wg.Done()
if err := e2.Campaign(context.Background(), "e2"); err != nil {
log.Fatal(err)
}
electc <- e2
}()
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
log.Fatal(err)
}
e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
wg.Wait()
输出:
Output:
completed first election with e2
completed second election with e1
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
Output:
acquired lock for s1
released lock for s1
acquired lock for s2
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// set up "accounts"
totalAccounts := 5
for i := 0; i < totalAccounts; i++ {
k := fmt.Sprintf("accts/%d", i)
if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
log.Fatal(err)
}
}
exchange := func(stm concurrency.STM) error {
from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
if from == to {
// nothing to do
return nil
}
// read values
fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
fromV, toV := stm.Get(fromK), stm.Get(toK)
fromInt, toInt := 0, 0
fmt.Sscanf(fromV, "%d", &fromInt)
fmt.Sscanf(toV, "%d", &toInt)
// transfer amount
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt+xfer
// write back
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
stm.Put(toK, fmt.Sprintf("%d", toInt))
return nil
}
// concurrently exchange values between accounts
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
log.Fatal(serr)
}
}()
}
wg.Wait()
// confirm account sum matches sum from beginning.
sum := 0
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
for _, kv := range accts.Kvs {
v := 0
fmt.Sscanf(string(kv.Value), "%d", &v)
sum += v
}
fmt.Println("account sum is", sum)
Output:
account sum is 500
四、注意
- 计算保护状态:
分配集群:删除保护状态,按照灾备端 —— 重新计算 保护状态
收回集群:修改保护状态,修改为已删除
集群收回之后 如果不是未保护,状态更改为已删除
获取全部 — 直接全部删除