ETCD简单介绍
etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
etcd raft分布式日志一致算法
动态讲解:http://thesecretlivesofdata.com/raft/
etcd-raft异常处理:https://www.cnblogs.com/mindwind/p/5231986.html
官网:https://etcd.io/docs/v3.4.0/
etcd 应用场景
- 配置管理 (golang 结合viper package)
- 服务注册于发现
- 选主
- 应用调度
- 分布式队列
- 分布式锁
- 配置管理
ubuntu 单机版搭建ETCD
下载
wget https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-linux-amd64.tar.gz
解压
tar zxvf etcd-v3.3.10-linux-amd64.tar.gz
启动/暴露IP给外网访问
nohup ./etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379' > ./etcd.log 2>&1 &
golang client USE ETCD
connect
// 配置连接信息
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 声明客户端
if client, err = clientv3.New(config); err!=nil{
log.Fatal(err)
return
}
CRUD
// 配置连接信息
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 声明客户端
if client, err = clientv3.New(config); err!=nil{
log.Fatal(err)
return
}
// kv,CRUD
kv = clientv3.NewKV(client)
// put - create update
putResp1, err := kv.Put(context.Background(), "/cron/task/1", "111")
if err!=nil{
fmt.Println(err)
return
}
fmt.Println(putResp1.Header)
putResp2, err := kv.Put(context.Background(), "/cron/task/2", "666")
if err!=nil{
fmt.Println(err)
return
}
fmt.Println(putResp2.Header)
// get
//clientv3.WithPrefix() 获取前缀为key的所以值
getResp, err := kv.Get(context.Background(), "/cron/task/", clientv3.WithPrefix())
if err!=nil{
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
// delete
delResp, err := kv.Delete(context.Background(), "/cron/task/1")
if err!=nil{
fmt.Println(err)
return
}
fmt.Println(delResp)
op
- 使用kv进行etcd的crud操作, 返回的response不是同一种类型的,使用op,返回的都是opResponse
// 配置连接信息
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 创建客户端
if client, err = clientv3.New(config); err != nil {
goto ERR
}
kv = clientv3.NewKV(client)
// 创建op: operation
putOp = clientv3.OpPut("cron/op/1", "cron/op/1")
//执行
if opResp, err = kv.Do(context.Background(), putOp); err != nil {
goto ERR
}
fmt.Println(opResp.Put().PrevKv)
getOp = clientv3.OpGet("cron/op/", clientv3.WithPrefix())
//执行
if opResp, err = kv.Do(context.Background(), getOp); err != nil {
goto ERR
}
fmt.Println(opResp.Get().Kvs)
ERR:
if err != nil {
fmt.Println(err)
}
lease 租约
- 类似与过期时间
- 可以续租
// 配置连接信息
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 声明客户端
if client, err = clientv3.New(config); err != nil {
goto ERR
}
// 申请lease (租约)
lease = clientv3.NewLease(client)
// 申请10秒的租约
if leaseGrantResp, err = lease.Grant(context.Background(), 5); err != nil {
goto ERR
}
// 租约ID
leaseId = leaseGrantResp.ID
ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)
// 配置自动续租, 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
goto ERR
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
if keepResp != nil {
fmt.Println("收到自动续租应答:", keepResp.ID)
} else {
fmt.Println("租约已经失效了")
goto END
}
}
}
}
END:
}()
// 获得kv API子集
kv = clientv3.NewKV(client)
// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
goto ERR
}
fmt.Println("写入成功:", putResp.Header.Revision)
// 定时的看一下key过期了没有
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
goto ERR
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
//_, err = lease.Revoke(context.TODO(), leaseId) // 取消租约
}
ERR:
fmt.Println(err)
监听
- 监听某个k(或目录)值的变化
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// KV
kv = clientv3.NewKV(client)
// 模拟etcd中KV的变化
go func(kv clientv3.KV) {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")
time.Sleep(1 * time.Second)
}
}(kv)
// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
fmt.Println(err)
return
}
// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
// 当前etcd集群事务ID, 单调递增的
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher
watcher = clientv3.NewWatcher(client)
// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})
// watchRespChan被关闭, for watchRespChan被终止
watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
// 监听事件(事件类型)
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
分布式锁
该具备的条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的的一个线程执行;
- 高可用的获取锁与释放锁;
- 高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁失效机制,防止死锁;即占有锁的某节点出现故障, 锁会被释放,不会死锁;
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
ETCD实现
func syncLock(f func(v ...interface{}), v ...interface{}) {
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"47.112.154.64:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// lease实现锁自动过期:
// op操作
// txn事务: if else then
// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease = clientv3.NewLease(client)
// 申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
// 拿到租约的ID
leaseId = leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
// 确保函数退出后, 自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId) //取消租约
// 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepResp == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// if 不存在key, then 设置它, else 抢锁失败
kv = clientv3.NewKV(client)
// 创建事务
txn = kv.Txn(context.TODO())
// 定义事务
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return // 没有问题
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2, 处理业务
f(v)
// 3, 释放锁(取消自动续租, 释放租约)
// defer 会把租约释放掉, 关联的KV就被删除了
}