ETCD简单介绍
etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
etcd raft分布式日志一致算法
动态讲解:http://thesecretlivesofdata.com/raft/
etcd-raft异常处理:https://www.cnblogs.com/mindwind/p/5231986.html
官网:https://etcd.io/docs/v3.4.0/
etcd 应用场景
- 配置管理 (golang 结合viper package)
- 服务注册于发现
- 选主
- 应用调度
- 分布式队列
- 分布式锁
- 配置管理
ubuntu 单机版搭建ETCD
下载wget https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-linux-amd64.tar.gz解压tar zxvf etcd-v3.3.10-linux-amd64.tar.gz启动/暴露IP给外网访问nohup ./etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379' > ./etcd.log 2>&1 &
golang client USE ETCD
connect
// 配置连接信息config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 声明客户端if client, err = clientv3.New(config); err!=nil{log.Fatal(err)return}
CRUD
// 配置连接信息config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 声明客户端if client, err = clientv3.New(config); err!=nil{log.Fatal(err)return}// kv,CRUDkv = clientv3.NewKV(client)// put - create updateputResp1, err := kv.Put(context.Background(), "/cron/task/1", "111")if err!=nil{fmt.Println(err)return}fmt.Println(putResp1.Header)putResp2, err := kv.Put(context.Background(), "/cron/task/2", "666")if err!=nil{fmt.Println(err)return}fmt.Println(putResp2.Header)// get//clientv3.WithPrefix() 获取前缀为key的所以值getResp, err := kv.Get(context.Background(), "/cron/task/", clientv3.WithPrefix())if err!=nil{fmt.Println(err)return}fmt.Println(getResp.Kvs)// deletedelResp, err := kv.Delete(context.Background(), "/cron/task/1")if err!=nil{fmt.Println(err)return}fmt.Println(delResp)
op
- 使用kv进行etcd的crud操作, 返回的response不是同一种类型的,使用op,返回的都是opResponse
// 配置连接信息config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 创建客户端if client, err = clientv3.New(config); err != nil {goto ERR}kv = clientv3.NewKV(client)// 创建op: operationputOp = clientv3.OpPut("cron/op/1", "cron/op/1")//执行if opResp, err = kv.Do(context.Background(), putOp); err != nil {goto ERR}fmt.Println(opResp.Put().PrevKv)getOp = clientv3.OpGet("cron/op/", clientv3.WithPrefix())//执行if opResp, err = kv.Do(context.Background(), getOp); err != nil {goto ERR}fmt.Println(opResp.Get().Kvs)ERR:if err != nil {fmt.Println(err)}
lease 租约
- 类似与过期时间
- 可以续租

// 配置连接信息config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 声明客户端if client, err = clientv3.New(config); err != nil {goto ERR}// 申请lease (租约)lease = clientv3.NewLease(client)// 申请10秒的租约if leaseGrantResp, err = lease.Grant(context.Background(), 5); err != nil {goto ERR}// 租约IDleaseId = leaseGrantResp.IDctx, _ = context.WithTimeout(context.Background(), 10*time.Second)// 配置自动续租, 5秒后会取消自动续租if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {goto ERR}// 处理续约应答的协程go func() {for {select {case keepResp = <-keepRespChan:if keepRespChan == nil {fmt.Println("租约已经失效了")goto END} else { // 每秒会续租一次, 所以就会受到一次应答if keepResp != nil {fmt.Println("收到自动续租应答:", keepResp.ID)} else {fmt.Println("租约已经失效了")goto END}}}}END:}()// 获得kv API子集kv = clientv3.NewKV(client)// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {goto ERR}fmt.Println("写入成功:", putResp.Header.Revision)// 定时的看一下key过期了没有for {if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {goto ERR}if getResp.Count == 0 {fmt.Println("kv过期了")break}fmt.Println("还没过期:", getResp.Kvs)time.Sleep(2 * time.Second)//_, err = lease.Revoke(context.TODO(), leaseId) // 取消租约}ERR:fmt.Println(err)
监听
- 监听某个k(或目录)值的变化


// 客户端配置config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 建立连接if client, err = clientv3.New(config); err != nil {fmt.Println(err)return}// KVkv = clientv3.NewKV(client)// 模拟etcd中KV的变化go func(kv clientv3.KV) {for {kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")kv.Delete(context.TODO(), "/cron/jobs/job7")time.Sleep(1 * time.Second)}}(kv)// 先GET到当前的值,并监听后续变化if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {fmt.Println(err)return}// 现在key是存在的if len(getResp.Kvs) != 0 {fmt.Println("当前值:", string(getResp.Kvs[0].Value))}// 当前etcd集群事务ID, 单调递增的watchStartRevision = getResp.Header.Revision + 1// 创建一个watcherwatcher = clientv3.NewWatcher(client)// 启动监听fmt.Println("从该版本向后监听:", watchStartRevision)ctx, cancelFunc := context.WithCancel(context.TODO())time.AfterFunc(5*time.Second, func() {cancelFunc()})// watchRespChan被关闭, for watchRespChan被终止watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))// 处理kv变化事件for watchResp = range watchRespChan {// 监听事件(事件类型)for _, event = range watchResp.Events {switch event.Type {case mvccpb.PUT:fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)case mvccpb.DELETE:fmt.Println("删除了", "Revision:", event.Kv.ModRevision)}}}
分布式锁
该具备的条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的的一个线程执行;
- 高可用的获取锁与释放锁;
- 高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁失效机制,防止死锁;即占有锁的某节点出现故障, 锁会被释放,不会死锁;
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
ETCD实现
func syncLock(f func(v ...interface{}), v ...interface{}) {// 客户端配置config = clientv3.Config{Endpoints: []string{"47.112.154.64:2379"},DialTimeout: 5 * time.Second,}// 建立连接if client, err = clientv3.New(config); err != nil {fmt.Println(err)return}// lease实现锁自动过期:// op操作// txn事务: if else then// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)lease = clientv3.NewLease(client)// 申请一个5秒的租约if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {fmt.Println(err)return}// 拿到租约的IDleaseId = leaseGrantResp.ID// 准备一个用于取消自动续租的contextctx, cancelFunc = context.WithCancel(context.TODO())// 确保函数退出后, 自动续租会停止defer cancelFunc()defer lease.Revoke(context.TODO(), leaseId) //取消租约// 5秒后会取消自动续租if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {fmt.Println(err)return}// 处理续约应答的协程go func() {for {select {case keepResp = <-keepRespChan:if keepResp == nil {fmt.Println("租约已经失效了")goto END} else { // 每秒会续租一次, 所以就会受到一次应答fmt.Println("收到自动续租应答:", keepResp.ID)}}}END:}()// if 不存在key, then 设置它, else 抢锁失败kv = clientv3.NewKV(client)// 创建事务txn = kv.Txn(context.TODO())// 定义事务// 如果key不存在txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败// 提交事务if txnResp, err = txn.Commit(); err != nil {fmt.Println(err)return // 没有问题}// 判断是否抢到了锁if !txnResp.Succeeded {fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))return}// 2, 处理业务f(v)// 3, 释放锁(取消自动续租, 释放租约)// defer 会把租约释放掉, 关联的KV就被删除了}
