ETCD简单介绍

etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。

etcd raft分布式日志一致算法

动态讲解:http://thesecretlivesofdata.com/raft/
etcd-raft异常处理:https://www.cnblogs.com/mindwind/p/5231986.html
官网:https://etcd.io/docs/v3.4.0/

etcd 应用场景

  1. 配置管理 (golang 结合viper package)
  2. 服务注册于发现
  3. 选主
  4. 应用调度
  5. 分布式队列
  6. 分布式锁
  7. 配置管理

ubuntu 单机版搭建ETCD

  1. 下载
  2. wget https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-linux-amd64.tar.gz
  3. 解压
  4. tar zxvf etcd-v3.3.10-linux-amd64.tar.gz
  5. 启动/暴露IP给外网访问
  6. nohup ./etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379' > ./etcd.log 2>&1 &

golang client USE ETCD

connect

  1. // 配置连接信息
  2. config = clientv3.Config{
  3. Endpoints: []string{"47.112.154.64:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. // 声明客户端
  7. if client, err = clientv3.New(config); err!=nil{
  8. log.Fatal(err)
  9. return
  10. }

CRUD

  1. // 配置连接信息
  2. config = clientv3.Config{
  3. Endpoints: []string{"47.112.154.64:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. // 声明客户端
  7. if client, err = clientv3.New(config); err!=nil{
  8. log.Fatal(err)
  9. return
  10. }
  11. // kv,CRUD
  12. kv = clientv3.NewKV(client)
  13. // put - create update
  14. putResp1, err := kv.Put(context.Background(), "/cron/task/1", "111")
  15. if err!=nil{
  16. fmt.Println(err)
  17. return
  18. }
  19. fmt.Println(putResp1.Header)
  20. putResp2, err := kv.Put(context.Background(), "/cron/task/2", "666")
  21. if err!=nil{
  22. fmt.Println(err)
  23. return
  24. }
  25. fmt.Println(putResp2.Header)
  26. // get
  27. //clientv3.WithPrefix() 获取前缀为key的所以值
  28. getResp, err := kv.Get(context.Background(), "/cron/task/", clientv3.WithPrefix())
  29. if err!=nil{
  30. fmt.Println(err)
  31. return
  32. }
  33. fmt.Println(getResp.Kvs)
  34. // delete
  35. delResp, err := kv.Delete(context.Background(), "/cron/task/1")
  36. if err!=nil{
  37. fmt.Println(err)
  38. return
  39. }
  40. fmt.Println(delResp)

op

  • 使用kv进行etcd的crud操作, 返回的response不是同一种类型的,使用op,返回的都是opResponse
  1. // 配置连接信息
  2. config = clientv3.Config{
  3. Endpoints: []string{"47.112.154.64:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. // 创建客户端
  7. if client, err = clientv3.New(config); err != nil {
  8. goto ERR
  9. }
  10. kv = clientv3.NewKV(client)
  11. // 创建op: operation
  12. putOp = clientv3.OpPut("cron/op/1", "cron/op/1")
  13. //执行
  14. if opResp, err = kv.Do(context.Background(), putOp); err != nil {
  15. goto ERR
  16. }
  17. fmt.Println(opResp.Put().PrevKv)
  18. getOp = clientv3.OpGet("cron/op/", clientv3.WithPrefix())
  19. //执行
  20. if opResp, err = kv.Do(context.Background(), getOp); err != nil {
  21. goto ERR
  22. }
  23. fmt.Println(opResp.Get().Kvs)
  24. ERR:
  25. if err != nil {
  26. fmt.Println(err)
  27. }

lease 租约

  • 类似与过期时间
  • 可以续租
    ETCD USAGE - 图1
  1. // 配置连接信息
  2. config = clientv3.Config{
  3. Endpoints: []string{"47.112.154.64:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. // 声明客户端
  7. if client, err = clientv3.New(config); err != nil {
  8. goto ERR
  9. }
  10. // 申请lease (租约)
  11. lease = clientv3.NewLease(client)
  12. // 申请10秒的租约
  13. if leaseGrantResp, err = lease.Grant(context.Background(), 5); err != nil {
  14. goto ERR
  15. }
  16. // 租约ID
  17. leaseId = leaseGrantResp.ID
  18. ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)
  19. // 配置自动续租, 5秒后会取消自动续租
  20. if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
  21. goto ERR
  22. }
  23. // 处理续约应答的协程
  24. go func() {
  25. for {
  26. select {
  27. case keepResp = <-keepRespChan:
  28. if keepRespChan == nil {
  29. fmt.Println("租约已经失效了")
  30. goto END
  31. } else { // 每秒会续租一次, 所以就会受到一次应答
  32. if keepResp != nil {
  33. fmt.Println("收到自动续租应答:", keepResp.ID)
  34. } else {
  35. fmt.Println("租约已经失效了")
  36. goto END
  37. }
  38. }
  39. }
  40. }
  41. END:
  42. }()
  43. // 获得kv API子集
  44. kv = clientv3.NewKV(client)
  45. // Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
  46. if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
  47. goto ERR
  48. }
  49. fmt.Println("写入成功:", putResp.Header.Revision)
  50. // 定时的看一下key过期了没有
  51. for {
  52. if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
  53. goto ERR
  54. }
  55. if getResp.Count == 0 {
  56. fmt.Println("kv过期了")
  57. break
  58. }
  59. fmt.Println("还没过期:", getResp.Kvs)
  60. time.Sleep(2 * time.Second)
  61. //_, err = lease.Revoke(context.TODO(), leaseId) // 取消租约
  62. }
  63. ERR:
  64. fmt.Println(err)

监听

  • 监听某个k(或目录)值的变化
    ETCD USAGE - 图2
    ETCD USAGE - 图3
  1. // 客户端配置
  2. config = clientv3.Config{
  3. Endpoints: []string{"47.112.154.64:2379"},
  4. DialTimeout: 5 * time.Second,
  5. }
  6. // 建立连接
  7. if client, err = clientv3.New(config); err != nil {
  8. fmt.Println(err)
  9. return
  10. }
  11. // KV
  12. kv = clientv3.NewKV(client)
  13. // 模拟etcd中KV的变化
  14. go func(kv clientv3.KV) {
  15. for {
  16. kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
  17. kv.Delete(context.TODO(), "/cron/jobs/job7")
  18. time.Sleep(1 * time.Second)
  19. }
  20. }(kv)
  21. // 先GET到当前的值,并监听后续变化
  22. if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
  23. fmt.Println(err)
  24. return
  25. }
  26. // 现在key是存在的
  27. if len(getResp.Kvs) != 0 {
  28. fmt.Println("当前值:", string(getResp.Kvs[0].Value))
  29. }
  30. // 当前etcd集群事务ID, 单调递增的
  31. watchStartRevision = getResp.Header.Revision + 1
  32. // 创建一个watcher
  33. watcher = clientv3.NewWatcher(client)
  34. // 启动监听
  35. fmt.Println("从该版本向后监听:", watchStartRevision)
  36. ctx, cancelFunc := context.WithCancel(context.TODO())
  37. time.AfterFunc(5*time.Second, func() {
  38. cancelFunc()
  39. })
  40. // watchRespChan被关闭, for watchRespChan被终止
  41. watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
  42. // 处理kv变化事件
  43. for watchResp = range watchRespChan {
  44. // 监听事件(事件类型)
  45. for _, event = range watchResp.Events {
  46. switch event.Type {
  47. case mvccpb.PUT:
  48. fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
  49. case mvccpb.DELETE:
  50. fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
  51. }
  52. }
  53. }

分布式锁

该具备的条件

  1. 在分布式系统环境下,一个方法在同一时间只能被一个机器的的一个线程执行;
  2. 高可用的获取锁与释放锁;
  3. 高性能的获取锁与释放锁;
  4. 具备可重入特性;
  5. 具备锁失效机制,防止死锁;即占有锁的某节点出现故障, 锁会被释放,不会死锁;
  6. 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

ETCD实现

  1. func syncLock(f func(v ...interface{}), v ...interface{}) {
  2. // 客户端配置
  3. config = clientv3.Config{
  4. Endpoints: []string{"47.112.154.64:2379"},
  5. DialTimeout: 5 * time.Second,
  6. }
  7. // 建立连接
  8. if client, err = clientv3.New(config); err != nil {
  9. fmt.Println(err)
  10. return
  11. }
  12. // lease实现锁自动过期:
  13. // op操作
  14. // txn事务: if else then
  15. // 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
  16. lease = clientv3.NewLease(client)
  17. // 申请一个5秒的租约
  18. if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
  19. fmt.Println(err)
  20. return
  21. }
  22. // 拿到租约的ID
  23. leaseId = leaseGrantResp.ID
  24. // 准备一个用于取消自动续租的context
  25. ctx, cancelFunc = context.WithCancel(context.TODO())
  26. // 确保函数退出后, 自动续租会停止
  27. defer cancelFunc()
  28. defer lease.Revoke(context.TODO(), leaseId) //取消租约
  29. // 5秒后会取消自动续租
  30. if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
  31. fmt.Println(err)
  32. return
  33. }
  34. // 处理续约应答的协程
  35. go func() {
  36. for {
  37. select {
  38. case keepResp = <-keepRespChan:
  39. if keepResp == nil {
  40. fmt.Println("租约已经失效了")
  41. goto END
  42. } else { // 每秒会续租一次, 所以就会受到一次应答
  43. fmt.Println("收到自动续租应答:", keepResp.ID)
  44. }
  45. }
  46. }
  47. END:
  48. }()
  49. // if 不存在key, then 设置它, else 抢锁失败
  50. kv = clientv3.NewKV(client)
  51. // 创建事务
  52. txn = kv.Txn(context.TODO())
  53. // 定义事务
  54. // 如果key不存在
  55. txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
  56. Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
  57. Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
  58. // 提交事务
  59. if txnResp, err = txn.Commit(); err != nil {
  60. fmt.Println(err)
  61. return // 没有问题
  62. }
  63. // 判断是否抢到了锁
  64. if !txnResp.Succeeded {
  65. fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
  66. return
  67. }
  68. // 2, 处理业务
  69. f(v)
  70. // 3, 释放锁(取消自动续租, 释放租约)
  71. // defer 会把租约释放掉, 关联的KV就被删除了
  72. }