简介
etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
etcd作为服务发现系统,有以下的特点:
- 简单:安装配置简单,而且提供了HTTP API进行交互,使用也很简单
- 安全:支持SSL证书验证
- 快速:根据官方提供的benchmark数据,单实例支持每秒2k+读操作
- 可靠:采用raft算法,实现分布式系统数据的可用性和一致性
etcd项目地址:https://github.com/coreos/etcd/
容器化的etcd服务搭建
version: '2.2'services:etcd:image: quay.io/coreos/etcd:v3.3.12container_name: etcdports:- 23791:2379- 2380environment:ETCDCTL_API: 3volumes:- ./data/etcd/etcd-data:/etcd-datacommand:- "/usr/local/bin/etcd"- "--name"- "s1"- "--data-dir"- "/etcd-data"- "--advertise-client-urls"- "http://0.0.0.0:2379"- --listen-client-urls- "http://0.0.0.0:2379"- "--initial-advertise-peer-urls"- "http://0.0.0.0:2380"- "--listen-peer-urls"- "http://0.0.0.0:2380"- "--initial-cluster-token"- "tkn"- "--initial-cluster"- "s1=http://0.0.0.0:2380"- "--initial-cluster-state"- "new"
etcd命令行的使用
首先进入到etcd容器中
docker exec -it etcd bash
执行以下命令进行练习
## 获取版本信息etcdctl version## 获取所有键值对etcdctl get --prefix ""## 添加键值对etcdctl put zhangsan hello## 删除键值对etcdctl del zhangsan## 添加一个过期时间为20s的租约etcdctl lease grant 20## 获取所有租约etcdctl lease list## 添加键值对,并为该键指定租约etcdctl put lisi world --lease="3f3574057fe0e61c"## 查看某个租约的keepalived时间etcdctl lease keep-alive 3f3574057fe0e61c## 续租etcdctl lease timetolive 3f3574057fe0e61c --keys## 回收租约etcdctl lease revoke 3f3574057fe0e61c
etcd的api接口说明
## 获取版本信息curl -L http://127.0.0.1:2379/version## 获取健康状态curl -L http://127.0.0.1:2379/health## 添加键值对curl http://127.0.0.1:2379/v2/keys/zhangsan -XPUT -d value="hello"## 查看键值对curl http://127.0.0.1:2379/v2/keys/zhangsan
go操作etcd
场景描述
我们试图使用go开发etcd sdk,启动两个简单的server端,向etcd分别注册自己的address,再启动一个client端,从etcd中发现服务,随机抽取一个进行访问请求。
one_server端的编写 ```go package main
import ( “context” “fmt” “github.com/gin-gonic/gin” “go.etcd.io/etcd/clientv3” “net/http” “sync” “time” )
const ( EtcdPrefix = “/test/server/“ ServerSerial = “1” Address = “http://127.0.0.1:18081/“ )
var ( EtcdAddress = []string{“http://127.0.0.1:23791"} leaseTTL = 5 )
type HealthProvider struct { etcdClient *EtcdClient }
var ( healthProvider *HealthProvider healthProviderOnce sync.Once )
func GetHealthProvider() *HealthProvider { healthProviderOnce.Do(func() { healthProvider = &HealthProvider{ etcdClient: NewEtcdClient(), } }) return healthProvider }
type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID leaseTTL int64 }
func NewEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, leaseTTL: int64(leaseTTL), } err := client.connect() if err != nil { panic(err) } return client }
func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }
func (etcdClient *EtcdClient) Close() (err error) { return etcdClient.client.Close() }
func (etcdClient EtcdClient) register(address string) (clientv3.PutResponse, error) { etcdClient.lease = clientv3.NewLease(etcdClient.client) leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL) if err != nil { return nil, err } etcdClient.leaseID = leaseResp.ID return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID)) }
func (etcdClient *EtcdClient) LeaseKeepAlive() error { if etcdClient.lease == nil { , err := etcdClient.register(Address) if err != nil { return err } } , err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID) if err != nil { return err } return nil }
func healthCheck(provider *HealthProvider) { var tick = time.NewTicker(time.Second) for { select { case <-tick.C: err := provider.etcdClient.LeaseKeepAlive() if err != nil { fmt.Println(err.Error()) return } } } }
func main() {
provider := GetHealthProvider()go healthCheck(provider)defer provider.etcdClient.Close()engine := gin.Default()engine.GET("/ping", func(c *gin.Context) {c.JSON(http.StatusOK, "one")})engine.Run(":18081")
}
- two_server端的编写```gopackage mainimport ("context""fmt""github.com/gin-gonic/gin""go.etcd.io/etcd/clientv3""net/http""sync""time")const (EtcdPrefix = "/test/server/"ServerSerial = "2"Address = "http://127.0.0.1:18082/")var (EtcdAddress = []string{"http://127.0.0.1:23791"}leaseTTL = 5)type HealthProvider struct {etcdClient *EtcdClient}var (healthProvider *HealthProviderhealthProviderOnce sync.Once)func GetHealthProvider() *HealthProvider {healthProviderOnce.Do(func() {healthProvider = &HealthProvider{etcdClient: NewEtcdClient(),}})return healthProvider}type EtcdClient struct {address []stringusername stringpassword stringkv clientv3.KVclient *clientv3.Clientctx context.Contextlease clientv3.LeaseleaseID clientv3.LeaseIDleaseTTL int64}func NewEtcdClient() *EtcdClient {var client = &EtcdClient{ctx: context.Background(),address: EtcdAddress,leaseTTL: int64(leaseTTL),}err := client.connect()if err != nil {panic(err)}return client}func (etcdClient *EtcdClient) connect() (err error) {etcdClient.client, err = clientv3.New(clientv3.Config{Endpoints: etcdClient.address,DialTimeout: 5 * time.Second,TLS: nil,Username: etcdClient.username,Password: etcdClient.password,})if err != nil {return}etcdClient.kv = clientv3.NewKV(etcdClient.client)etcdClient.ctx = context.Background()return}func (etcdClient *EtcdClient) Close() (err error) {return etcdClient.client.Close()}func (etcdClient *EtcdClient) register(address string) (*clientv3.PutResponse, error) {etcdClient.lease = clientv3.NewLease(etcdClient.client)leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL)if err != nil {return nil, err}etcdClient.leaseID = leaseResp.IDreturn etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID))}func (etcdClient *EtcdClient) LeaseKeepAlive() error {if etcdClient.lease == nil {_, err := etcdClient.register(Address)if err != nil {return err}}_, err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID)if err != nil {return err}return nil}func healthCheck(provider *HealthProvider) {var tick = time.NewTicker(time.Second)for {select {case <-tick.C:err := provider.etcdClient.LeaseKeepAlive()if err != nil {fmt.Println(err.Error())return}}}}func main() {provider := GetHealthProvider()go healthCheck(provider)defer provider.etcdClient.Close()engine := gin.Default()engine.GET("/ping", func(c *gin.Context) {c.JSON(http.StatusOK, "two")})engine.Run(":18082")}
- client端的编写 ```go package main
import ( “context” “fmt” “github.com/coreos/etcd/clientv3” “io/ioutil” “math/rand” “net/http” “time” )
var ( EtcdAddress = []string{“http://127.0.0.1:23791"} ServerPrefix = “/test/server/“ )
type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID }
func newEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, } err := client.connect() if err != nil { panic(err) } return client }
func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }
func (etcdClient *EtcdClient) list(prefix string) ([]string, error) { resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } servers := make([]string, 0) for _, value := range resp.Kvs { if value != nil { servers = append(servers, string(value.Value)) } } return servers, nil }
func (etcdClient *EtcdClient) close() (err error) { return etcdClient.client.Close() }
func genRand(num int) int { return int(rand.Int31n(int32(num))) }
func getServer(client *EtcdClient) (string, error) { servers, err := client.list(ServerPrefix) if err != nil { return “”, err } return servers[genRand(len(servers))], nil }
func Get(url string) ([]byte, error) { client := &http.Client{} req, err := http.NewRequest(“GET”, url, nil) if err != nil { return nil, err } res, err := client.Do(req) if err != nil { return nil, err } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, err } return body, nil }
func main() { client := newEtcdClient() err := client.connect() if err != nil { panic(err) } defer client.close()
for i := 0; i < 10; i++ {address, err := getServer(client)if err != nil {fmt.Println(err.Error())return}data, err := Get(address + "ping")if err != nil {fmt.Println(err.Error())return}fmt.Println(string(data))time.Sleep(2 * time.Second)}
测试
- 分别启动one_server和two_server服务,向etcd注册;
- 启动client服务,循环请求10次,查看结果;
结果
会发现请求one_server和two_server的频率慢慢趋于平均
