- 将数据存储在集群中的高可用KV存储
- 允许应用实时监听存储中的kv的变化
- 能够容忍单点故障, 能都应对网络分区
etcd与raft的关系
- raft是强一致的集群日志同步算法
- etcd是一个分布式kv存储
- etcd利用raft算法在集群中同步kv
quorum模型-大多数模型
- 集群需要2N+1个节点
- 日志实时复制到follower
写入性能差: 1000次/秒
- 异步通知follower完成提交
- 选举leader需要半数以上的节点参与
- 节点commit日志最多的允许选举为leader
- commit日志同样多, 则term, index越大的允许选举为leader
- 各个节点数据最终一致
底层存储按key有序排列的(字母大小), 可以顺序遍历
支持复杂的事务 提供类似if…then…else..的事务能力
mvcc多版本控制
- 用于实现watch机制
监听KV变化
Lease租约
- 实现key删除
etcd单机版使用
put
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
kv := clientv3.NewKV(client)
res, err := kv.Put(context.TODO(), "/jobs/job1", "echo hello1", clientv3.WithPrevKV())
fmt.Println(err)
fmt.Println(res.Header.Revision)
if res.PrevKv != nil {
fmt.Println(string(res.PrevKv.Value))
} else {
fmt.Println(res.PrevKv)
}
}
get
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
kv := clientv3.NewKV(client)
// /jobs/job1
res, err := kv.Get(context.TODO(), `/jobs/`, clientv3.WithPrefix())
fmt.Println(err)
for i, value := range res.Kvs {
fmt.Println(i, "-> ", value)
}
}
delete
// kv.Delete(context.TODO(), key)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
kv := clientv3.NewKV(client)
// /jobs/job1
response, err := kv.Delete(context.TODO(), "/jobs/job2", clientv3.WithPrevKV())
fmt.Println(err)
fmt.Println(response.PrevKvs)
}
租约lease
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
lease := clientv3.NewLease(client)
res, err := lease.Grant(context.TODO(), 10)
if err != nil {
panic(err)
}
// 自动续租
alive, err := lease.KeepAlive(context.TODO(), res.ID)
if err != nil {
panic(err)
}
go func() {
for {
select {
case keepResp := <-alive:
if keepResp == nil {
fmt.Println("租约失效了")
} else {
fmt.Println("收到租约应答: ", res.ID)
}
}
}
}()
kv := clientv3.NewKV(client)
putResponse, err := kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(res.ID))
if err != nil {
panic(err)
}
fmt.Println("写入成功: ", putResponse.Header.Revision)
// 查看key是否过期
for {
getResponse, err := kv.Get(context.TODO(), "/cron/lock/job1")
if err != nil {
panic(err)
}
if getResponse.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("kv没过期", getResponse.Kvs)
time.Sleep(1 * time.Second)
}
}
watch
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
kv := clientv3.NewKV(client)
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")
time.Sleep(1 * time.Second)
}
}()
// 获取到当前的值, 并监听后续变化
if getRes, err := kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
panic(err)
} else {
if len(getRes.Kvs) != 0 { // key存在的
fmt.Println(getRes.Kvs[0].Value)
}
// 单调增的事务ID
revision := getRes.Header.Revision + 1
// watcher
watcher := clientv3.Watcher(client)
watchChan := watcher.Watch(context.TODO(), "/cron/jobs/job7", clientv3.WithRev(revision))
for response := range watchChan {
for i, event := range response.Events {
fmt.Println(i)
fmt.Println("Type: ", event.Type)
fmt.Println(string(event.Kv.Value))
fmt.Println(event.Kv.ModRevision)
}
}
}
}
Op操作
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
kv := clientv3.NewKV(client)
// 创建op
opPut := clientv3.OpPut("job8", "job8")
// 执行op
response, err := kv.Do(context.TODO(), opPut)
fmt.Println(err)
fmt.Println(response)
}
乐观锁
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
var client *clientv3.Client
var err error
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
lease := clientv3.Lease(client)
grant, err := lease.Grant(context.TODO(), 5)
if err != nil {
panic(err)
}
leaseID := grant.ID
// 取消续租的ctx
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseID)
keepAliveChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
panic(err)
}
go func() {
for {
select {
case keepResp := <-keepAliveChan:
if keepResp == nil {
fmt.Println("租约失效了")
} else {
fmt.Println("收到租约应答: ", keepResp.ID)
}
}
}
}()
kv := clientv3.NewKV(client)
// 1. 上锁
// 2. 处理业务
// 3. 释放锁(取消自动续租, 释放租约)
// if不存在key, then设置锁, else抢锁失败 CreateRevision == 0
txn := kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/jobs/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/jobs/job9", "", clientv3.WithLease(leaseID))).
Else(clientv3.OpGet("/cron/jobs/job9")) // 否则抢锁失败
commit, err := txn.Commit()
if err != nil {
panic(err)
}
// 判断释放抢到锁
if !commit.Succeeded {
fmt.Println("锁被占用: ", string(commit.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
}