• 将数据存储在集群中的高可用KV存储
  • 允许应用实时监听存储中的kv的变化
  • 能够容忍单点故障, 能都应对网络分区

etcd与raft的关系

  • raft是强一致的集群日志同步算法
  • etcd是一个分布式kv存储
  • etcd利用raft算法在集群中同步kv

quorum模型-大多数模型

  • 集群需要2N+1个节点
  1. 日志实时复制到follower

写入性能差: 1000次/秒

image.png

  1. 异步通知follower完成提交

image.png

  • 选举leader需要半数以上的节点参与
  • 节点commit日志最多的允许选举为leader
  • commit日志同样多, 则term, index越大的允许选举为leader
  • 各个节点数据最终一致

底层存储按key有序排列的(字母大小), 可以顺序遍历

支持复杂的事务 提供类似if…then…else..的事务能力

mvcc多版本控制

  • 用于实现watch机制

image.png

监听KV变化
image.png

Lease租约

  • 实现key删除

etcd单机版使用

put

  1. func main() {
  2. config := clientv3.Config{
  3. Endpoints: []string{"127.0.0.1:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. var client *clientv3.Client
  7. var err error
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. }
  11. kv := clientv3.NewKV(client)
  12. res, err := kv.Put(context.TODO(), "/jobs/job1", "echo hello1", clientv3.WithPrevKV())
  13. fmt.Println(err)
  14. fmt.Println(res.Header.Revision)
  15. if res.PrevKv != nil {
  16. fmt.Println(string(res.PrevKv.Value))
  17. } else {
  18. fmt.Println(res.PrevKv)
  19. }
  20. }

get

  1. func main() {
  2. config := clientv3.Config{
  3. Endpoints: []string{"127.0.0.1:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. var client *clientv3.Client
  7. var err error
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. }
  11. kv := clientv3.NewKV(client)
  12. // /jobs/job1
  13. res, err := kv.Get(context.TODO(), `/jobs/`, clientv3.WithPrefix())
  14. fmt.Println(err)
  15. for i, value := range res.Kvs {
  16. fmt.Println(i, "-> ", value)
  17. }
  18. }

delete

  1. // kv.Delete(context.TODO(), key)
  2. func main() {
  3. config := clientv3.Config{
  4. Endpoints: []string{"127.0.0.1:2379"},
  5. DialTimeout: 5 * time.Second,
  6. }
  7. var client *clientv3.Client
  8. var err error
  9. if client, err = clientv3.New(config); err != nil {
  10. fmt.Println(err)
  11. }
  12. kv := clientv3.NewKV(client)
  13. // /jobs/job1
  14. response, err := kv.Delete(context.TODO(), "/jobs/job2", clientv3.WithPrevKV())
  15. fmt.Println(err)
  16. fmt.Println(response.PrevKvs)
  17. }

租约lease

  1. func main() {
  2. config := clientv3.Config{
  3. Endpoints: []string{"127.0.0.1:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. var client *clientv3.Client
  7. var err error
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. }
  11. lease := clientv3.NewLease(client)
  12. res, err := lease.Grant(context.TODO(), 10)
  13. if err != nil {
  14. panic(err)
  15. }
  16. // 自动续租
  17. alive, err := lease.KeepAlive(context.TODO(), res.ID)
  18. if err != nil {
  19. panic(err)
  20. }
  21. go func() {
  22. for {
  23. select {
  24. case keepResp := <-alive:
  25. if keepResp == nil {
  26. fmt.Println("租约失效了")
  27. } else {
  28. fmt.Println("收到租约应答: ", res.ID)
  29. }
  30. }
  31. }
  32. }()
  33. kv := clientv3.NewKV(client)
  34. putResponse, err := kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(res.ID))
  35. if err != nil {
  36. panic(err)
  37. }
  38. fmt.Println("写入成功: ", putResponse.Header.Revision)
  39. // 查看key是否过期
  40. for {
  41. getResponse, err := kv.Get(context.TODO(), "/cron/lock/job1")
  42. if err != nil {
  43. panic(err)
  44. }
  45. if getResponse.Count == 0 {
  46. fmt.Println("kv过期了")
  47. break
  48. }
  49. fmt.Println("kv没过期", getResponse.Kvs)
  50. time.Sleep(1 * time.Second)
  51. }
  52. }

watch

  1. func main() {
  2. config := clientv3.Config{
  3. Endpoints: []string{"127.0.0.1:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. var client *clientv3.Client
  7. var err error
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. }
  11. kv := clientv3.NewKV(client)
  12. go func() {
  13. for {
  14. kv.Put(context.TODO(), "/cron/jobs/job7", "job7")
  15. kv.Delete(context.TODO(), "/cron/jobs/job7")
  16. time.Sleep(1 * time.Second)
  17. }
  18. }()
  19. // 获取到当前的值, 并监听后续变化
  20. if getRes, err := kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
  21. panic(err)
  22. } else {
  23. if len(getRes.Kvs) != 0 { // key存在的
  24. fmt.Println(getRes.Kvs[0].Value)
  25. }
  26. // 单调增的事务ID
  27. revision := getRes.Header.Revision + 1
  28. // watcher
  29. watcher := clientv3.Watcher(client)
  30. watchChan := watcher.Watch(context.TODO(), "/cron/jobs/job7", clientv3.WithRev(revision))
  31. for response := range watchChan {
  32. for i, event := range response.Events {
  33. fmt.Println(i)
  34. fmt.Println("Type: ", event.Type)
  35. fmt.Println(string(event.Kv.Value))
  36. fmt.Println(event.Kv.ModRevision)
  37. }
  38. }
  39. }
  40. }

Op操作

  1. func main() {
  2. config := clientv3.Config{
  3. Endpoints: []string{"127.0.0.1:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. var client *clientv3.Client
  7. var err error
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. }
  11. kv := clientv3.NewKV(client)
  12. // 创建op
  13. opPut := clientv3.OpPut("job8", "job8")
  14. // 执行op
  15. response, err := kv.Do(context.TODO(), opPut)
  16. fmt.Println(err)
  17. fmt.Println(response)
  18. }

乐观锁

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. clientv3 "go.etcd.io/etcd/client/v3"
  6. "time"
  7. )
  8. func main() {
  9. config := clientv3.Config{
  10. Endpoints: []string{"127.0.0.1:2379"},
  11. DialTimeout: 5 * time.Second,
  12. }
  13. var client *clientv3.Client
  14. var err error
  15. if client, err = clientv3.New(config); err != nil {
  16. fmt.Println(err)
  17. }
  18. lease := clientv3.Lease(client)
  19. grant, err := lease.Grant(context.TODO(), 5)
  20. if err != nil {
  21. panic(err)
  22. }
  23. leaseID := grant.ID
  24. // 取消续租的ctx
  25. ctx, cancelFunc := context.WithCancel(context.TODO())
  26. defer cancelFunc()
  27. defer lease.Revoke(context.TODO(), leaseID)
  28. keepAliveChan, err := lease.KeepAlive(ctx, leaseID)
  29. if err != nil {
  30. panic(err)
  31. }
  32. go func() {
  33. for {
  34. select {
  35. case keepResp := <-keepAliveChan:
  36. if keepResp == nil {
  37. fmt.Println("租约失效了")
  38. } else {
  39. fmt.Println("收到租约应答: ", keepResp.ID)
  40. }
  41. }
  42. }
  43. }()
  44. kv := clientv3.NewKV(client)
  45. // 1. 上锁
  46. // 2. 处理业务
  47. // 3. 释放锁(取消自动续租, 释放租约)
  48. // if不存在key, then设置锁, else抢锁失败 CreateRevision == 0
  49. txn := kv.Txn(context.TODO())
  50. txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/jobs/job9"), "=", 0)).
  51. Then(clientv3.OpPut("/cron/jobs/job9", "", clientv3.WithLease(leaseID))).
  52. Else(clientv3.OpGet("/cron/jobs/job9")) // 否则抢锁失败
  53. commit, err := txn.Commit()
  54. if err != nil {
  55. panic(err)
  56. }
  57. // 判断释放抢到锁
  58. if !commit.Succeeded {
  59. fmt.Println("锁被占用: ", string(commit.Responses[0].GetResponseRange().Kvs[0].Value))
  60. return
  61. }
  62. // 处理业务
  63. fmt.Println("处理任务")
  64. time.Sleep(5 * time.Second)
  65. }