- 将数据存储在集群中的高可用KV存储
- 允许应用实时监听存储中的kv的变化
- 能够容忍单点故障, 能都应对网络分区
etcd与raft的关系
- raft是强一致的集群日志同步算法
- etcd是一个分布式kv存储
- etcd利用raft算法在集群中同步kv
quorum模型-大多数模型
- 集群需要2N+1个节点
- 日志实时复制到follower
写入性能差: 1000次/秒

- 异步通知follower完成提交

- 选举leader需要半数以上的节点参与
- 节点commit日志最多的允许选举为leader
- commit日志同样多, 则term, index越大的允许选举为leader
- 各个节点数据最终一致
底层存储按key有序排列的(字母大小), 可以顺序遍历
支持复杂的事务 提供类似if…then…else..的事务能力
mvcc多版本控制
- 用于实现watch机制

监听KV变化
Lease租约
- 实现key删除
etcd单机版使用
put
func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}kv := clientv3.NewKV(client)res, err := kv.Put(context.TODO(), "/jobs/job1", "echo hello1", clientv3.WithPrevKV())fmt.Println(err)fmt.Println(res.Header.Revision)if res.PrevKv != nil {fmt.Println(string(res.PrevKv.Value))} else {fmt.Println(res.PrevKv)}}
get
func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}kv := clientv3.NewKV(client)// /jobs/job1res, err := kv.Get(context.TODO(), `/jobs/`, clientv3.WithPrefix())fmt.Println(err)for i, value := range res.Kvs {fmt.Println(i, "-> ", value)}}
delete
// kv.Delete(context.TODO(), key)func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}kv := clientv3.NewKV(client)// /jobs/job1response, err := kv.Delete(context.TODO(), "/jobs/job2", clientv3.WithPrevKV())fmt.Println(err)fmt.Println(response.PrevKvs)}
租约lease
func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}lease := clientv3.NewLease(client)res, err := lease.Grant(context.TODO(), 10)if err != nil {panic(err)}// 自动续租alive, err := lease.KeepAlive(context.TODO(), res.ID)if err != nil {panic(err)}go func() {for {select {case keepResp := <-alive:if keepResp == nil {fmt.Println("租约失效了")} else {fmt.Println("收到租约应答: ", res.ID)}}}}()kv := clientv3.NewKV(client)putResponse, err := kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(res.ID))if err != nil {panic(err)}fmt.Println("写入成功: ", putResponse.Header.Revision)// 查看key是否过期for {getResponse, err := kv.Get(context.TODO(), "/cron/lock/job1")if err != nil {panic(err)}if getResponse.Count == 0 {fmt.Println("kv过期了")break}fmt.Println("kv没过期", getResponse.Kvs)time.Sleep(1 * time.Second)}}
watch
func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}kv := clientv3.NewKV(client)go func() {for {kv.Put(context.TODO(), "/cron/jobs/job7", "job7")kv.Delete(context.TODO(), "/cron/jobs/job7")time.Sleep(1 * time.Second)}}()// 获取到当前的值, 并监听后续变化if getRes, err := kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {panic(err)} else {if len(getRes.Kvs) != 0 { // key存在的fmt.Println(getRes.Kvs[0].Value)}// 单调增的事务IDrevision := getRes.Header.Revision + 1// watcherwatcher := clientv3.Watcher(client)watchChan := watcher.Watch(context.TODO(), "/cron/jobs/job7", clientv3.WithRev(revision))for response := range watchChan {for i, event := range response.Events {fmt.Println(i)fmt.Println("Type: ", event.Type)fmt.Println(string(event.Kv.Value))fmt.Println(event.Kv.ModRevision)}}}}
Op操作
func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}kv := clientv3.NewKV(client)// 创建opopPut := clientv3.OpPut("job8", "job8")// 执行opresponse, err := kv.Do(context.TODO(), opPut)fmt.Println(err)fmt.Println(response)}
乐观锁
package mainimport ("context""fmt"clientv3 "go.etcd.io/etcd/client/v3""time")func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},DialTimeout: 5 * time.Second,}var client *clientv3.Clientvar err errorif client, err = clientv3.New(config); err != nil {fmt.Println(err)}lease := clientv3.Lease(client)grant, err := lease.Grant(context.TODO(), 5)if err != nil {panic(err)}leaseID := grant.ID// 取消续租的ctxctx, cancelFunc := context.WithCancel(context.TODO())defer cancelFunc()defer lease.Revoke(context.TODO(), leaseID)keepAliveChan, err := lease.KeepAlive(ctx, leaseID)if err != nil {panic(err)}go func() {for {select {case keepResp := <-keepAliveChan:if keepResp == nil {fmt.Println("租约失效了")} else {fmt.Println("收到租约应答: ", keepResp.ID)}}}}()kv := clientv3.NewKV(client)// 1. 上锁// 2. 处理业务// 3. 释放锁(取消自动续租, 释放租约)// if不存在key, then设置锁, else抢锁失败 CreateRevision == 0txn := kv.Txn(context.TODO())txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/jobs/job9"), "=", 0)).Then(clientv3.OpPut("/cron/jobs/job9", "", clientv3.WithLease(leaseID))).Else(clientv3.OpGet("/cron/jobs/job9")) // 否则抢锁失败commit, err := txn.Commit()if err != nil {panic(err)}// 判断释放抢到锁if !commit.Succeeded {fmt.Println("锁被占用: ", string(commit.Responses[0].GetResponseRange().Kvs[0].Value))return}// 处理业务fmt.Println("处理任务")time.Sleep(5 * time.Second)}
