简介

etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
etcd作为服务发现系统,有以下的特点:

  • 简单:安装配置简单,而且提供了HTTP API进行交互,使用也很简单
  • 安全:支持SSL证书验证
  • 快速:根据官方提供的benchmark数据,单实例支持每秒2k+读操作
  • 可靠:采用raft算法,实现分布式系统数据的可用性和一致性

etcd项目地址:https://github.com/coreos/etcd/

容器化的etcd服务搭建

  1. version: '2.2'
  2. services:
  3. etcd:
  4. image: quay.io/coreos/etcd:v3.3.12
  5. container_name: etcd
  6. ports:
  7. - 23791:2379
  8. - 2380
  9. environment:
  10. ETCDCTL_API: 3
  11. volumes:
  12. - ./data/etcd/etcd-data:/etcd-data
  13. command:
  14. - "/usr/local/bin/etcd"
  15. - "--name"
  16. - "s1"
  17. - "--data-dir"
  18. - "/etcd-data"
  19. - "--advertise-client-urls"
  20. - "http://0.0.0.0:2379"
  21. - --listen-client-urls
  22. - "http://0.0.0.0:2379"
  23. - "--initial-advertise-peer-urls"
  24. - "http://0.0.0.0:2380"
  25. - "--listen-peer-urls"
  26. - "http://0.0.0.0:2380"
  27. - "--initial-cluster-token"
  28. - "tkn"
  29. - "--initial-cluster"
  30. - "s1=http://0.0.0.0:2380"
  31. - "--initial-cluster-state"
  32. - "new"

etcd命令行的使用

  • 首先进入到etcd容器中

    docker exec -it etcd bash

  • 执行以下命令进行练习

    1. ## 获取版本信息
    2. etcdctl version
    3. ## 获取所有键值对
    4. etcdctl get --prefix ""
    5. ## 添加键值对
    6. etcdctl put zhangsan hello
    7. ## 删除键值对
    8. etcdctl del zhangsan
    9. ## 添加一个过期时间为20s的租约
    10. etcdctl lease grant 20
    11. ## 获取所有租约
    12. etcdctl lease list
    13. ## 添加键值对,并为该键指定租约
    14. etcdctl put lisi world --lease="3f3574057fe0e61c"
    15. ## 查看某个租约的keepalived时间
    16. etcdctl lease keep-alive 3f3574057fe0e61c
    17. ## 续租
    18. etcdctl lease timetolive 3f3574057fe0e61c --keys
    19. ## 回收租约
    20. etcdctl lease revoke 3f3574057fe0e61c

    etcd的api接口说明

    1. ## 获取版本信息
    2. curl -L http://127.0.0.1:2379/version
    3. ## 获取健康状态
    4. curl -L http://127.0.0.1:2379/health
    5. ## 添加键值对
    6. curl http://127.0.0.1:2379/v2/keys/zhangsan -XPUT -d value="hello"
    7. ## 查看键值对
    8. curl http://127.0.0.1:2379/v2/keys/zhangsan

    go操作etcd

    场景描述

    我们试图使用go开发etcd sdk,启动两个简单的server端,向etcd分别注册自己的address,再启动一个client端,从etcd中发现服务,随机抽取一个进行访问请求。

  • one_server端的编写 ```go package main

import ( “context” “fmt” “github.com/gin-gonic/gin” “go.etcd.io/etcd/clientv3” “net/http” “sync” “time” )

const ( EtcdPrefix = “/test/server/“ ServerSerial = “1” Address = “http://127.0.0.1:18081/“ )

var ( EtcdAddress = []string{“http://127.0.0.1:23791"} leaseTTL = 5 )

type HealthProvider struct { etcdClient *EtcdClient }

var ( healthProvider *HealthProvider healthProviderOnce sync.Once )

func GetHealthProvider() *HealthProvider { healthProviderOnce.Do(func() { healthProvider = &HealthProvider{ etcdClient: NewEtcdClient(), } }) return healthProvider }

type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID leaseTTL int64 }

func NewEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, leaseTTL: int64(leaseTTL), } err := client.connect() if err != nil { panic(err) } return client }

func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }

func (etcdClient *EtcdClient) Close() (err error) { return etcdClient.client.Close() }

func (etcdClient EtcdClient) register(address string) (clientv3.PutResponse, error) { etcdClient.lease = clientv3.NewLease(etcdClient.client) leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL) if err != nil { return nil, err } etcdClient.leaseID = leaseResp.ID return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID)) }

func (etcdClient *EtcdClient) LeaseKeepAlive() error { if etcdClient.lease == nil { , err := etcdClient.register(Address) if err != nil { return err } } , err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID) if err != nil { return err } return nil }

func healthCheck(provider *HealthProvider) { var tick = time.NewTicker(time.Second) for { select { case <-tick.C: err := provider.etcdClient.LeaseKeepAlive() if err != nil { fmt.Println(err.Error()) return } } } }

func main() {

  1. provider := GetHealthProvider()
  2. go healthCheck(provider)
  3. defer provider.etcdClient.Close()
  4. engine := gin.Default()
  5. engine.GET("/ping", func(c *gin.Context) {
  6. c.JSON(http.StatusOK, "one")
  7. })
  8. engine.Run(":18081")

}

  1. - two_server端的编写
  2. ```go
  3. package main
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/gin-gonic/gin"
  8. "go.etcd.io/etcd/clientv3"
  9. "net/http"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. EtcdPrefix = "/test/server/"
  15. ServerSerial = "2"
  16. Address = "http://127.0.0.1:18082/"
  17. )
  18. var (
  19. EtcdAddress = []string{"http://127.0.0.1:23791"}
  20. leaseTTL = 5
  21. )
  22. type HealthProvider struct {
  23. etcdClient *EtcdClient
  24. }
  25. var (
  26. healthProvider *HealthProvider
  27. healthProviderOnce sync.Once
  28. )
  29. func GetHealthProvider() *HealthProvider {
  30. healthProviderOnce.Do(func() {
  31. healthProvider = &HealthProvider{
  32. etcdClient: NewEtcdClient(),
  33. }
  34. })
  35. return healthProvider
  36. }
  37. type EtcdClient struct {
  38. address []string
  39. username string
  40. password string
  41. kv clientv3.KV
  42. client *clientv3.Client
  43. ctx context.Context
  44. lease clientv3.Lease
  45. leaseID clientv3.LeaseID
  46. leaseTTL int64
  47. }
  48. func NewEtcdClient() *EtcdClient {
  49. var client = &EtcdClient{
  50. ctx: context.Background(),
  51. address: EtcdAddress,
  52. leaseTTL: int64(leaseTTL),
  53. }
  54. err := client.connect()
  55. if err != nil {
  56. panic(err)
  57. }
  58. return client
  59. }
  60. func (etcdClient *EtcdClient) connect() (err error) {
  61. etcdClient.client, err = clientv3.New(clientv3.Config{
  62. Endpoints: etcdClient.address,
  63. DialTimeout: 5 * time.Second,
  64. TLS: nil,
  65. Username: etcdClient.username,
  66. Password: etcdClient.password,
  67. })
  68. if err != nil {
  69. return
  70. }
  71. etcdClient.kv = clientv3.NewKV(etcdClient.client)
  72. etcdClient.ctx = context.Background()
  73. return
  74. }
  75. func (etcdClient *EtcdClient) Close() (err error) {
  76. return etcdClient.client.Close()
  77. }
  78. func (etcdClient *EtcdClient) register(address string) (*clientv3.PutResponse, error) {
  79. etcdClient.lease = clientv3.NewLease(etcdClient.client)
  80. leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL)
  81. if err != nil {
  82. return nil, err
  83. }
  84. etcdClient.leaseID = leaseResp.ID
  85. return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID))
  86. }
  87. func (etcdClient *EtcdClient) LeaseKeepAlive() error {
  88. if etcdClient.lease == nil {
  89. _, err := etcdClient.register(Address)
  90. if err != nil {
  91. return err
  92. }
  93. }
  94. _, err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID)
  95. if err != nil {
  96. return err
  97. }
  98. return nil
  99. }
  100. func healthCheck(provider *HealthProvider) {
  101. var tick = time.NewTicker(time.Second)
  102. for {
  103. select {
  104. case <-tick.C:
  105. err := provider.etcdClient.LeaseKeepAlive()
  106. if err != nil {
  107. fmt.Println(err.Error())
  108. return
  109. }
  110. }
  111. }
  112. }
  113. func main() {
  114. provider := GetHealthProvider()
  115. go healthCheck(provider)
  116. defer provider.etcdClient.Close()
  117. engine := gin.Default()
  118. engine.GET("/ping", func(c *gin.Context) {
  119. c.JSON(http.StatusOK, "two")
  120. })
  121. engine.Run(":18082")
  122. }
  • client端的编写 ```go package main

import ( “context” “fmt” “github.com/coreos/etcd/clientv3” “io/ioutil” “math/rand” “net/http” “time” )

var ( EtcdAddress = []string{“http://127.0.0.1:23791"} ServerPrefix = “/test/server/“ )

type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID }

func newEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, } err := client.connect() if err != nil { panic(err) } return client }

func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }

func (etcdClient *EtcdClient) list(prefix string) ([]string, error) { resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } servers := make([]string, 0) for _, value := range resp.Kvs { if value != nil { servers = append(servers, string(value.Value)) } } return servers, nil }

func (etcdClient *EtcdClient) close() (err error) { return etcdClient.client.Close() }

func genRand(num int) int { return int(rand.Int31n(int32(num))) }

func getServer(client *EtcdClient) (string, error) { servers, err := client.list(ServerPrefix) if err != nil { return “”, err } return servers[genRand(len(servers))], nil }

func Get(url string) ([]byte, error) { client := &http.Client{} req, err := http.NewRequest(“GET”, url, nil) if err != nil { return nil, err } res, err := client.Do(req) if err != nil { return nil, err } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, err } return body, nil }

func main() { client := newEtcdClient() err := client.connect() if err != nil { panic(err) } defer client.close()

  1. for i := 0; i < 10; i++ {
  2. address, err := getServer(client)
  3. if err != nil {
  4. fmt.Println(err.Error())
  5. return
  6. }
  7. data, err := Get(address + "ping")
  8. if err != nil {
  9. fmt.Println(err.Error())
  10. return
  11. }
  12. fmt.Println(string(data))
  13. time.Sleep(2 * time.Second)
  14. }

} ```

测试
  • 分别启动one_server和two_server服务,向etcd注册;
  • 启动client服务,循环请求10次,查看结果;

结果

会发现请求one_server和two_server的频率慢慢趋于平均