简介
etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft
协议作为一致性算法,etcd基于Go语言实现。
etcd作为服务发现系统,有以下的特点:
- 简单:安装配置简单,而且提供了HTTP API进行交互,使用也很简单
- 安全:支持SSL证书验证
- 快速:根据官方提供的benchmark数据,单实例支持每秒2k+读操作
- 可靠:采用raft算法,实现分布式系统数据的可用性和一致性
etcd项目地址:https://github.com/coreos/etcd/
容器化的etcd服务搭建
version: '2.2'
services:
etcd:
image: quay.io/coreos/etcd:v3.3.12
container_name: etcd
ports:
- 23791:2379
- 2380
environment:
ETCDCTL_API: 3
volumes:
- ./data/etcd/etcd-data:/etcd-data
command:
- "/usr/local/bin/etcd"
- "--name"
- "s1"
- "--data-dir"
- "/etcd-data"
- "--advertise-client-urls"
- "http://0.0.0.0:2379"
- --listen-client-urls
- "http://0.0.0.0:2379"
- "--initial-advertise-peer-urls"
- "http://0.0.0.0:2380"
- "--listen-peer-urls"
- "http://0.0.0.0:2380"
- "--initial-cluster-token"
- "tkn"
- "--initial-cluster"
- "s1=http://0.0.0.0:2380"
- "--initial-cluster-state"
- "new"
etcd命令行的使用
首先进入到etcd容器中
docker exec -it etcd bash
执行以下命令进行练习
## 获取版本信息
etcdctl version
## 获取所有键值对
etcdctl get --prefix ""
## 添加键值对
etcdctl put zhangsan hello
## 删除键值对
etcdctl del zhangsan
## 添加一个过期时间为20s的租约
etcdctl lease grant 20
## 获取所有租约
etcdctl lease list
## 添加键值对,并为该键指定租约
etcdctl put lisi world --lease="3f3574057fe0e61c"
## 查看某个租约的keepalived时间
etcdctl lease keep-alive 3f3574057fe0e61c
## 续租
etcdctl lease timetolive 3f3574057fe0e61c --keys
## 回收租约
etcdctl lease revoke 3f3574057fe0e61c
etcd的api接口说明
## 获取版本信息
curl -L http://127.0.0.1:2379/version
## 获取健康状态
curl -L http://127.0.0.1:2379/health
## 添加键值对
curl http://127.0.0.1:2379/v2/keys/zhangsan -XPUT -d value="hello"
## 查看键值对
curl http://127.0.0.1:2379/v2/keys/zhangsan
go操作etcd
场景描述
我们试图使用go开发etcd sdk,启动两个简单的server端,向etcd分别注册自己的address,再启动一个client端,从etcd中发现服务,随机抽取一个进行访问请求。
one_server端的编写 ```go package main
import ( “context” “fmt” “github.com/gin-gonic/gin” “go.etcd.io/etcd/clientv3” “net/http” “sync” “time” )
const ( EtcdPrefix = “/test/server/“ ServerSerial = “1” Address = “http://127.0.0.1:18081/“ )
var ( EtcdAddress = []string{“http://127.0.0.1:23791"} leaseTTL = 5 )
type HealthProvider struct { etcdClient *EtcdClient }
var ( healthProvider *HealthProvider healthProviderOnce sync.Once )
func GetHealthProvider() *HealthProvider { healthProviderOnce.Do(func() { healthProvider = &HealthProvider{ etcdClient: NewEtcdClient(), } }) return healthProvider }
type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID leaseTTL int64 }
func NewEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, leaseTTL: int64(leaseTTL), } err := client.connect() if err != nil { panic(err) } return client }
func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }
func (etcdClient *EtcdClient) Close() (err error) { return etcdClient.client.Close() }
func (etcdClient EtcdClient) register(address string) (clientv3.PutResponse, error) { etcdClient.lease = clientv3.NewLease(etcdClient.client) leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL) if err != nil { return nil, err } etcdClient.leaseID = leaseResp.ID return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID)) }
func (etcdClient *EtcdClient) LeaseKeepAlive() error { if etcdClient.lease == nil { , err := etcdClient.register(Address) if err != nil { return err } } , err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID) if err != nil { return err } return nil }
func healthCheck(provider *HealthProvider) { var tick = time.NewTicker(time.Second) for { select { case <-tick.C: err := provider.etcdClient.LeaseKeepAlive() if err != nil { fmt.Println(err.Error()) return } } } }
func main() {
provider := GetHealthProvider()
go healthCheck(provider)
defer provider.etcdClient.Close()
engine := gin.Default()
engine.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, "one")
})
engine.Run(":18081")
}
- two_server端的编写
```go
package main
import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"go.etcd.io/etcd/clientv3"
"net/http"
"sync"
"time"
)
const (
EtcdPrefix = "/test/server/"
ServerSerial = "2"
Address = "http://127.0.0.1:18082/"
)
var (
EtcdAddress = []string{"http://127.0.0.1:23791"}
leaseTTL = 5
)
type HealthProvider struct {
etcdClient *EtcdClient
}
var (
healthProvider *HealthProvider
healthProviderOnce sync.Once
)
func GetHealthProvider() *HealthProvider {
healthProviderOnce.Do(func() {
healthProvider = &HealthProvider{
etcdClient: NewEtcdClient(),
}
})
return healthProvider
}
type EtcdClient struct {
address []string
username string
password string
kv clientv3.KV
client *clientv3.Client
ctx context.Context
lease clientv3.Lease
leaseID clientv3.LeaseID
leaseTTL int64
}
func NewEtcdClient() *EtcdClient {
var client = &EtcdClient{
ctx: context.Background(),
address: EtcdAddress,
leaseTTL: int64(leaseTTL),
}
err := client.connect()
if err != nil {
panic(err)
}
return client
}
func (etcdClient *EtcdClient) connect() (err error) {
etcdClient.client, err = clientv3.New(clientv3.Config{
Endpoints: etcdClient.address,
DialTimeout: 5 * time.Second,
TLS: nil,
Username: etcdClient.username,
Password: etcdClient.password,
})
if err != nil {
return
}
etcdClient.kv = clientv3.NewKV(etcdClient.client)
etcdClient.ctx = context.Background()
return
}
func (etcdClient *EtcdClient) Close() (err error) {
return etcdClient.client.Close()
}
func (etcdClient *EtcdClient) register(address string) (*clientv3.PutResponse, error) {
etcdClient.lease = clientv3.NewLease(etcdClient.client)
leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL)
if err != nil {
return nil, err
}
etcdClient.leaseID = leaseResp.ID
return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID))
}
func (etcdClient *EtcdClient) LeaseKeepAlive() error {
if etcdClient.lease == nil {
_, err := etcdClient.register(Address)
if err != nil {
return err
}
}
_, err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID)
if err != nil {
return err
}
return nil
}
func healthCheck(provider *HealthProvider) {
var tick = time.NewTicker(time.Second)
for {
select {
case <-tick.C:
err := provider.etcdClient.LeaseKeepAlive()
if err != nil {
fmt.Println(err.Error())
return
}
}
}
}
func main() {
provider := GetHealthProvider()
go healthCheck(provider)
defer provider.etcdClient.Close()
engine := gin.Default()
engine.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, "two")
})
engine.Run(":18082")
}
- client端的编写 ```go package main
import ( “context” “fmt” “github.com/coreos/etcd/clientv3” “io/ioutil” “math/rand” “net/http” “time” )
var ( EtcdAddress = []string{“http://127.0.0.1:23791"} ServerPrefix = “/test/server/“ )
type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID }
func newEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, } err := client.connect() if err != nil { panic(err) } return client }
func (etcdClient EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return }
func (etcdClient *EtcdClient) list(prefix string) ([]string, error) { resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } servers := make([]string, 0) for _, value := range resp.Kvs { if value != nil { servers = append(servers, string(value.Value)) } } return servers, nil }
func (etcdClient *EtcdClient) close() (err error) { return etcdClient.client.Close() }
func genRand(num int) int { return int(rand.Int31n(int32(num))) }
func getServer(client *EtcdClient) (string, error) { servers, err := client.list(ServerPrefix) if err != nil { return “”, err } return servers[genRand(len(servers))], nil }
func Get(url string) ([]byte, error) { client := &http.Client{} req, err := http.NewRequest(“GET”, url, nil) if err != nil { return nil, err } res, err := client.Do(req) if err != nil { return nil, err } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, err } return body, nil }
func main() { client := newEtcdClient() err := client.connect() if err != nil { panic(err) } defer client.close()
for i := 0; i < 10; i++ {
address, err := getServer(client)
if err != nil {
fmt.Println(err.Error())
return
}
data, err := Get(address + "ping")
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println(string(data))
time.Sleep(2 * time.Second)
}
测试
- 分别启动one_server和two_server服务,向etcd注册;
- 启动client服务,循环请求10次,查看结果;
结果
会发现请求one_server和two_server的频率慢慢趋于平均