• 从etcd中把job同步到内存
  • 实现调度模块, 基于cron表达式调度N个job
  • 实现执行模块, 并发执行多个job
  • 对job的分布式锁, 防止集群并发
  • 把执行的日志保存到mongodb

监听任务的变化

  1. func InitJobMgr() error {
  2. config := clientv3.Config{
  3. Endpoints: G_config.EtcdEndpoints,
  4. DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond,
  5. }
  6. client, err := clientv3.New(config)
  7. if err != nil {
  8. return err
  9. }
  10. G_jobMgr = &JobMgr{
  11. client: client,
  12. kv: clientv3.NewKV(client),
  13. lease: clientv3.NewLease(client),
  14. watcher: clientv3.NewWatcher(client),
  15. }
  16. // 启动任务监听
  17. G_jobMgr.watchJobs()
  18. return nil
  19. }
  1. // 监听任务变化
  2. func (jobMgr *JobMgr) watchJobs() error {
  3. // 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision
  4. // 2. 从该revision向后监听变化事件
  5. response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix())
  6. if err != nil {
  7. return err
  8. }
  9. // 当前有哪些任务
  10. for _, kv := range response.Kvs {
  11. job, err := common.UnpackJob(kv.Value)
  12. if err != nil {
  13. return err
  14. }
  15. // job同步到调度协程(scheduler)
  16. log.Println(`当前任务: `, job.Name)
  17. common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
  18. }
  19. // 任务变化后 从该revision向后监听变化事件
  20. go func() {
  21. revision := response.Header.GetRevision() + 1
  22. // 监听/cron/jobs/目录后续变化, 返回chan
  23. watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix())
  24. // 处理监听事件
  25. for watchResponse := range watchChan {
  26. for _, event := range watchResponse.Events {
  27. switch event.Type {
  28. case mvccpb.PUT: // 保存任务事件
  29. // 推送更新事件给 scheduler
  30. if job, err := common.UnpackJob(event.Kv.Value); err != nil {
  31. log.Println(`common.UnpackJob err: `, err)
  32. continue
  33. } else {
  34. log.Println(`获取到保存任务事件: `, job.Name)
  35. // 构建一个更新event事件
  36. common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
  37. }
  38. case mvccpb.DELETE: // 任务被删除了
  39. // 推送删除事件给 scheduler
  40. jobName := common.ExtractJobName(string(event.Kv.Key))
  41. log.Println(`获取到删除任务事件: `, jobName)
  42. // 构建一个删除event事件
  43. common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{
  44. Name: jobName,
  45. })
  46. }
  47. }
  48. }
  49. }()
  50. return nil
  51. }
  1. package worker
  2. import (
  3. "context"
  4. "go.etcd.io/etcd/api/v3/mvccpb"
  5. clientv3 "go.etcd.io/etcd/client/v3"
  6. "golang-crontab/common"
  7. "log"
  8. "time"
  9. )
  10. type JobMgr struct {
  11. client *clientv3.Client
  12. kv clientv3.KV
  13. lease clientv3.Lease
  14. watcher clientv3.Watcher
  15. }
  16. var G_jobMgr *JobMgr
  17. func InitJobMgr() error {
  18. config := clientv3.Config{
  19. Endpoints: G_config.EtcdEndpoints,
  20. DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond,
  21. }
  22. client, err := clientv3.New(config)
  23. if err != nil {
  24. return err
  25. }
  26. G_jobMgr = &JobMgr{
  27. client: client,
  28. kv: clientv3.NewKV(client),
  29. lease: clientv3.NewLease(client),
  30. watcher: clientv3.NewWatcher(client),
  31. }
  32. // 启动任务监听
  33. err = G_jobMgr.watchJobs()
  34. // 启动监听killer
  35. G_jobMgr.watchKiller()
  36. return err
  37. }
  38. // 监听强杀任务通知
  39. func (jobMgr *JobMgr) watchKiller() {
  40. var (
  41. watchChan clientv3.WatchChan
  42. watchResp clientv3.WatchResponse
  43. watchEvent *clientv3.Event
  44. jobEvent *common.JobEvent
  45. jobName string
  46. job *common.Job
  47. )
  48. // 监听/cron/killer目录
  49. go func() { // 监听协程
  50. // 监听/cron/killer/目录的变化
  51. watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())
  52. // 处理监听事件
  53. for watchResp = range watchChan {
  54. for _, watchEvent = range watchResp.Events {
  55. switch watchEvent.Type {
  56. case mvccpb.PUT: // 杀死任务事件
  57. jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
  58. job = &common.Job{Name: jobName}
  59. jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job)
  60. // 事件推给scheduler
  61. G_scheduler.PushJobEvent(jobEvent)
  62. case mvccpb.DELETE: // killer标记过期, 被自动删除
  63. }
  64. }
  65. }
  66. }()
  67. }
  68. // 监听任务变化
  69. func (jobMgr *JobMgr) watchJobs() error {
  70. // 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision
  71. // 2. 从该revision向后监听变化事件
  72. response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix())
  73. if err != nil {
  74. return err
  75. }
  76. // 当前有哪些任务
  77. for _, kv := range response.Kvs {
  78. job, err := common.UnpackJob(kv.Value)
  79. if err != nil {
  80. return err
  81. }
  82. // job同步到调度协程(scheduler)
  83. log.Println(`当前存在的任务: `, job.Name)
  84. jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
  85. G_scheduler.PushJobEvent(jobEvent)
  86. }
  87. // 任务变化后 从该revision向后监听变化事件
  88. go func() {
  89. revision := response.Header.GetRevision() + 1
  90. // 监听/cron/jobs/目录后续变化, 返回chan
  91. watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix())
  92. // 处理监听事件
  93. for watchResponse := range watchChan {
  94. for _, event := range watchResponse.Events {
  95. switch event.Type {
  96. case mvccpb.PUT: // 保存任务事件
  97. // 推送更新事件给 scheduler
  98. if job, err := common.UnpackJob(event.Kv.Value); err != nil {
  99. log.Println(`common.UnpackJob err: `, err)
  100. continue
  101. } else {
  102. log.Println(`获取到保存任务事件: `, job.Name)
  103. // 构建一个更新event事件
  104. jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
  105. G_scheduler.PushJobEvent(jobEvent)
  106. }
  107. case mvccpb.DELETE: // 任务被删除了
  108. // 推送删除事件给 scheduler
  109. jobName := common.ExtractJobName(string(event.Kv.Key))
  110. log.Println(`获取到删除任务事件: `, jobName)
  111. // 构建一个删除event事件
  112. jobEvent := common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{
  113. Name: jobName,
  114. })
  115. G_scheduler.PushJobEvent(jobEvent)
  116. }
  117. }
  118. }
  119. }()
  120. return nil
  121. }
  122. // 创建任务执行锁
  123. func (jobMgr *JobMgr) CreateJobLock(jobName string) (jobLock *JobLock) {
  124. jobLock = InitJobLock(jobName, jobMgr.kv, jobMgr.lease)
  125. return
  126. }

任务发送给scheduler

  1. package worker
  2. import (
  3. "fmt"
  4. "golang-crontab/common"
  5. "log"
  6. "time"
  7. )
  8. // 任务调度
  9. type Scheduler struct {
  10. jobEventChan chan *common.JobEvent
  11. jobPlanTable map[string]*common.JobSchedulePlan
  12. jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表
  13. jobResultChan chan *common.JobExecuteResult // 任务结果队列
  14. }
  15. var G_scheduler *Scheduler
  16. func InitScheduler() {
  17. G_scheduler = &Scheduler{
  18. jobEventChan: make(chan *common.JobEvent, 1000),
  19. jobPlanTable: make(map[string]*common.JobSchedulePlan),
  20. jobExecutingTable: make(map[string]*common.JobExecuteInfo),
  21. jobResultChan: make(chan *common.JobExecuteResult, 1000),
  22. }
  23. go G_scheduler.scheduleLoop()
  24. }
  25. // 重新计算任务调度状态
  26. func (s *Scheduler) TryScheduler() (schedulerAfter time.Duration) {
  27. // 1. 遍历所有任务
  28. // 2. 过期任务立即执行
  29. // 3. 统计最近要过期的任务
  30. // 如果任务表为空话,随便睡眠多久
  31. if len(s.jobPlanTable) == 0 {
  32. schedulerAfter = 1 * time.Second
  33. return
  34. }
  35. now := time.Now()
  36. var nearTime *time.Time
  37. for _, plan := range s.jobPlanTable {
  38. if plan.NextTime.Before(now) || plan.NextTime.Equal(now) {
  39. // 尝试执行任务
  40. //fmt.Println("执行任务: ", time.Now().Format(`2006-01-02 15:04:05`), plan.Job.Name)
  41. s.TryStartJob(plan)
  42. plan.NextTime = plan.Expr.Next(now) // 更新下次执行时间
  43. }
  44. // 统计最近要过期的任务
  45. if nearTime == nil || plan.NextTime.Before(*nearTime) {
  46. nearTime = &plan.NextTime
  47. }
  48. }
  49. if nearTime != nil {
  50. schedulerAfter = (*nearTime).Sub(time.Now())
  51. }
  52. return
  53. }
  54. // 尝试执行任务
  55. func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
  56. // 调度 和 执行 是2件事情
  57. var (
  58. jobExecuteInfo *common.JobExecuteInfo
  59. jobExecuting bool
  60. )
  61. // 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
  62. // 如果任务正在执行,跳过本次调度
  63. if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobPlan.Job.Name]; jobExecuting {
  64. // fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name)
  65. return
  66. }
  67. // 构建执行状态信息
  68. jobExecuteInfo = common.BuildJobExecuteInfo(jobPlan)
  69. // 保存执行状态
  70. scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo
  71. // 执行任务
  72. fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime)
  73. G_executor.ExecuteJob(jobExecuteInfo)
  74. }
  75. // 调度协程
  76. func (s *Scheduler) scheduleLoop() {
  77. schedulerAfter := s.TryScheduler()
  78. schedulerTimer := time.NewTimer(schedulerAfter)
  79. for {
  80. select {
  81. case jobEvent := <-s.jobEventChan: // 获取到任务变化事件
  82. s.handleJobEvent(jobEvent)
  83. case <-schedulerTimer.C: // 最近的任务到期了
  84. case jobResult := <-s.jobResultChan:
  85. s.handleJobResult(jobResult)
  86. }
  87. // 调度一次任务
  88. schedulerAfter = s.TryScheduler()
  89. // 重置调度间隔
  90. schedulerTimer.Reset(schedulerAfter)
  91. }
  92. }
  93. // 向scheduler推送任务变化事件
  94. func (s *Scheduler) PushJobEvent(jobEvent *common.JobEvent) {
  95. s.jobEventChan <- jobEvent
  96. }
  97. // 处理任务事件
  98. func (s *Scheduler) handleJobEvent(event *common.JobEvent) {
  99. switch event.EventType {
  100. case common.JOB_EVENT_SAVE:
  101. plan, err := common.BuildJobSchedulePlan(event.Job)
  102. if err != nil {
  103. log.Println(`BuildJobSchedulePlanL err: `, err)
  104. }
  105. s.jobPlanTable[event.Job.Name] = plan
  106. case common.JOB_EVENT_DELETE:
  107. if _, ok := s.jobPlanTable[event.Job.Name]; ok {
  108. delete(s.jobPlanTable, event.Job.Name)
  109. }
  110. case common.JOB_EVENT_KILL:
  111. // 取消掉Command执行, 判断任务是否在执行中
  112. if jobExecuteInfo, exist := s.jobExecutingTable[event.Job.Name]; exist {
  113. jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
  114. }
  115. }
  116. }
  117. // 回传任务执行结果
  118. func (s *Scheduler) PushJobResult(jobResult *common.JobExecuteResult) {
  119. s.jobResultChan <- jobResult
  120. }
  121. // 处理任务结果
  122. func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
  123. // 删除执行状态
  124. delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name)
  125. fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err)
  126. var (
  127. jobLog *common.JobLog
  128. )
  129. // 删除执行状态
  130. delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name)
  131. // 生成执行日志
  132. if result.Err != common.ERR_LOCK_ALREADY_REQUIRED {
  133. jobLog = &common.JobLog{
  134. JobName: result.ExecuteInfo.Job.Name,
  135. Command: result.ExecuteInfo.Job.Command,
  136. Output: string(result.Output),
  137. PlanTime: result.ExecuteInfo.PlanTime.UnixNano() / 1000 / 1000,
  138. ScheduleTime: result.ExecuteInfo.RealTime.UnixNano() / 1000 / 1000,
  139. StartTime: result.StartTime.UnixNano() / 1000 / 1000,
  140. EndTime: result.EndTime.UnixNano() / 1000 / 1000,
  141. }
  142. if result.Err != nil {
  143. jobLog.Err = result.Err.Error()
  144. } else {
  145. jobLog.Err = ""
  146. }
  147. G_logSink.Append(jobLog)
  148. }
  149. }

任务执行器

  1. package worker
  2. import (
  3. "golang-crontab/common"
  4. "log"
  5. "math/rand"
  6. "os/exec"
  7. "time"
  8. )
  9. // 任务执行器
  10. type Executor struct {
  11. }
  12. var (
  13. G_executor *Executor
  14. )
  15. // 初始化执行器
  16. func InitExecutor() (err error) {
  17. G_executor = &Executor{}
  18. return
  19. }
  20. // 执行一个任务
  21. func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
  22. //log.Println(`ExecuteJob: `, info.Job.Command)
  23. go func() {
  24. var (
  25. cmd *exec.Cmd
  26. err error
  27. output []byte
  28. result *common.JobExecuteResult
  29. jobLock *JobLock
  30. )
  31. // 任务结果
  32. result = &common.JobExecuteResult{
  33. ExecuteInfo: info,
  34. Output: make([]byte, 0),
  35. }
  36. // 初始化分布式锁
  37. jobLock = G_jobMgr.CreateJobLock(info.Job.Name)
  38. // 记录任务开始时间
  39. result.StartTime = time.Now()
  40. // 上锁
  41. // 随机睡眠(0~1s)
  42. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
  43. err = jobLock.TryLock()
  44. defer jobLock.Unlock()
  45. if err != nil { // 上锁失败
  46. log.Println(`上锁失败, err: `, err)
  47. result.Err = err
  48. result.EndTime = time.Now()
  49. } else {
  50. // 上锁成功后,重置任务启动时间
  51. result.StartTime = time.Now()
  52. // 执行shell命令
  53. cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
  54. // 执行并捕获输出
  55. output, err = cmd.CombinedOutput()
  56. // 记录任务结束时间
  57. result.EndTime = time.Now()
  58. result.Output = output
  59. result.Err = err
  60. }
  61. // 任务执行完成后,把执行的结果返回给Scheduler,Scheduler会从executingTable中删除掉执行记录
  62. G_scheduler.PushJobResult(result)
  63. }()
  64. }

分布式锁

  1. package worker
  2. import (
  3. "context"
  4. clientv3 "go.etcd.io/etcd/client/v3"
  5. "golang-crontab/common"
  6. )
  7. // 分布式锁(TXN事务)
  8. type JobLock struct {
  9. // etcd客户端
  10. kv clientv3.KV
  11. lease clientv3.Lease
  12. jobName string // 任务名
  13. cancelFunc context.CancelFunc // 用于终止自动续租
  14. leaseId clientv3.LeaseID // 租约ID
  15. isLocked bool // 是否上锁成功
  16. }
  17. // 初始化一把锁
  18. func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) {
  19. jobLock = &JobLock{
  20. kv: kv,
  21. lease: lease,
  22. jobName: jobName,
  23. }
  24. return
  25. }
  26. // 尝试上锁
  27. func (jobLock *JobLock) TryLock() (err error) {
  28. var (
  29. leaseGrantResp *clientv3.LeaseGrantResponse
  30. cancelCtx context.Context
  31. cancelFunc context.CancelFunc
  32. leaseId clientv3.LeaseID
  33. keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
  34. txn clientv3.Txn
  35. lockKey string
  36. txnResp *clientv3.TxnResponse
  37. )
  38. // 1, 创建租约(5秒)
  39. if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
  40. return
  41. }
  42. // context用于取消自动续租
  43. cancelCtx, cancelFunc = context.WithCancel(context.TODO())
  44. // 租约ID
  45. leaseId = leaseGrantResp.ID
  46. // 2, 自动续租
  47. if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil {
  48. goto FAIL
  49. }
  50. // 3, 处理续租应答的协程
  51. go func() {
  52. var (
  53. keepResp *clientv3.LeaseKeepAliveResponse
  54. )
  55. for {
  56. select {
  57. case keepResp = <-keepRespChan: // 自动续租的应答
  58. if keepResp == nil {
  59. goto END
  60. }
  61. }
  62. }
  63. END:
  64. }()
  65. // 4, 创建事务txn
  66. txn = jobLock.kv.Txn(context.TODO())
  67. // 锁路径
  68. lockKey = common.JOB_LOCK_DIR + jobLock.jobName
  69. // 5, 事务抢锁
  70. txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
  71. Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
  72. Else(clientv3.OpGet(lockKey))
  73. // 提交事务
  74. if txnResp, err = txn.Commit(); err != nil {
  75. goto FAIL
  76. }
  77. // 6, 成功返回, 失败释放租约
  78. if !txnResp.Succeeded { // 锁被占用
  79. err = common.ERR_LOCK_ALREADY_REQUIRED
  80. goto FAIL
  81. }
  82. // 抢锁成功
  83. jobLock.leaseId = leaseId
  84. jobLock.cancelFunc = cancelFunc
  85. jobLock.isLocked = true
  86. return
  87. FAIL:
  88. cancelFunc() // 取消自动续租
  89. jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约
  90. return
  91. }
  92. // 释放锁
  93. func (jobLock *JobLock) Unlock() {
  94. if jobLock.isLocked {
  95. jobLock.cancelFunc() // 取消我们程序自动续租的协程
  96. jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约
  97. }
  98. }

执行日志保存到mongodb

  1. package worker
  2. import (
  3. "context"
  4. "go.mongodb.org/mongo-driver/mongo"
  5. "go.mongodb.org/mongo-driver/mongo/options"
  6. "golang-crontab/common"
  7. "time"
  8. )
  9. // mongodb存储日志
  10. type LogSink struct {
  11. client *mongo.Client
  12. logCollection *mongo.Collection
  13. logChan chan *common.JobLog
  14. autoCommitChan chan *common.LogBatch
  15. }
  16. var (
  17. // 单例
  18. G_logSink *LogSink
  19. )
  20. // 批量写入日志
  21. func (logSink *LogSink) saveLogs(batch *common.LogBatch) {
  22. logSink.logCollection.InsertMany(context.TODO(), batch.Logs)
  23. }
  24. // 日志存储协程
  25. func (logSink *LogSink) writeLoop() {
  26. var (
  27. log *common.JobLog
  28. logBatch *common.LogBatch // 当前的批次
  29. commitTimer *time.Timer
  30. timeoutBatch *common.LogBatch // 超时批次
  31. )
  32. for {
  33. select {
  34. case log = <-logSink.logChan:
  35. if logBatch == nil {
  36. logBatch = &common.LogBatch{}
  37. // 让这个批次超时自动提交(给1秒的时间)
  38. commitTimer = time.AfterFunc(
  39. time.Duration(G_config.JobLogCommitTimeout)*time.Millisecond,
  40. func(batch *common.LogBatch) func() {
  41. return func() {
  42. logSink.autoCommitChan <- batch
  43. }
  44. }(logBatch),
  45. )
  46. }
  47. // 把新日志追加到批次中
  48. logBatch.Logs = append(logBatch.Logs, log)
  49. // 如果批次满了, 就立即发送
  50. if len(logBatch.Logs) >= G_config.JobLogBatchSize {
  51. // 发送日志
  52. logSink.saveLogs(logBatch)
  53. // 清空logBatch
  54. logBatch = nil
  55. // 取消定时器
  56. commitTimer.Stop()
  57. }
  58. case timeoutBatch = <-logSink.autoCommitChan: // 过期的批次
  59. // 判断过期批次是否仍旧是当前的批次
  60. if timeoutBatch != logBatch {
  61. continue // 跳过已经被提交的批次
  62. }
  63. // 把批次写入到mongo中
  64. logSink.saveLogs(timeoutBatch)
  65. // 清空logBatch
  66. logBatch = nil
  67. }
  68. }
  69. }
  70. func InitLogSink() (err error) {
  71. // 建立mongodb连接
  72. client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017"))
  73. if err != nil {
  74. return err
  75. }
  76. // 选择db和collection
  77. G_logSink = &LogSink{
  78. client: client,
  79. logCollection: client.Database("cron").Collection("log"),
  80. logChan: make(chan *common.JobLog, 1000),
  81. autoCommitChan: make(chan *common.LogBatch, 1000),
  82. }
  83. // 启动一个mongodb处理协程
  84. go G_logSink.writeLoop()
  85. return
  86. }
  87. // 发送日志
  88. func (logSink *LogSink) Append(jobLog *common.JobLog) {
  89. select {
  90. case logSink.logChan <- jobLog:
  91. default:
  92. // 队列满了就丢弃
  93. }
  94. }

worker 服务注册

  1. package worker
  2. import (
  3. "context"
  4. clientv3 "go.etcd.io/etcd/client/v3"
  5. "golang-crontab/common"
  6. "net"
  7. "time"
  8. )
  9. // 注册节点到etcd: /cron/workers/IP地址
  10. type Register struct {
  11. client *clientv3.Client
  12. kv clientv3.KV
  13. lease clientv3.Lease
  14. localIP string // 本机IP
  15. }
  16. var (
  17. G_register *Register
  18. )
  19. // 获取本机网卡IP
  20. func getLocalIP() (ipv4 string, err error) {
  21. var (
  22. addrs []net.Addr
  23. addr net.Addr
  24. ipNet *net.IPNet // IP地址
  25. isIpNet bool
  26. )
  27. // 获取所有网卡
  28. if addrs, err = net.InterfaceAddrs(); err != nil {
  29. return
  30. }
  31. // 取第一个非lo的网卡IP
  32. for _, addr = range addrs {
  33. // 这个网络地址是IP地址: ipv4, ipv6
  34. if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() {
  35. // 跳过IPV6
  36. if ipNet.IP.To4() != nil {
  37. ipv4 = ipNet.IP.String() // 192.168.1.1
  38. return
  39. }
  40. }
  41. }
  42. err = common.ERR_NO_LOCAL_IP_FOUND
  43. return
  44. }
  45. // 注册到/cron/workers/IP, 并自动续租
  46. func (register *Register) keepOnline() {
  47. var (
  48. regKey string
  49. leaseGrantResp *clientv3.LeaseGrantResponse
  50. err error
  51. keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  52. keepAliveResp *clientv3.LeaseKeepAliveResponse
  53. cancelCtx context.Context
  54. cancelFunc context.CancelFunc
  55. )
  56. for {
  57. // 注册路径
  58. regKey = common.JOB_WORKER_DIR + register.localIP
  59. cancelFunc = nil
  60. // 创建租约
  61. if leaseGrantResp, err = register.lease.Grant(context.TODO(), 10); err != nil {
  62. goto RETRY
  63. }
  64. // 自动续租
  65. if keepAliveChan, err = register.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil {
  66. goto RETRY
  67. }
  68. cancelCtx, cancelFunc = context.WithCancel(context.TODO())
  69. // 注册到etcd
  70. if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil {
  71. goto RETRY
  72. }
  73. // 处理续租应答
  74. for {
  75. select {
  76. case keepAliveResp = <-keepAliveChan:
  77. if keepAliveResp == nil { // 续租失败
  78. goto RETRY
  79. }
  80. }
  81. }
  82. RETRY:
  83. time.Sleep(1 * time.Second)
  84. if cancelFunc != nil {
  85. cancelFunc()
  86. }
  87. }
  88. }
  89. func InitRegister() (err error) {
  90. var (
  91. config clientv3.Config
  92. client *clientv3.Client
  93. kv clientv3.KV
  94. lease clientv3.Lease
  95. localIp string
  96. )
  97. // 初始化配置
  98. config = clientv3.Config{
  99. Endpoints: G_config.EtcdEndpoints, // 集群地址
  100. DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
  101. }
  102. // 建立连接
  103. if client, err = clientv3.New(config); err != nil {
  104. return
  105. }
  106. // 本机IP
  107. if localIp, err = getLocalIP(); err != nil {
  108. return
  109. }
  110. // 得到KV和Lease的API子集
  111. kv = clientv3.NewKV(client)
  112. lease = clientv3.NewLease(client)
  113. G_register = &Register{
  114. client: client,
  115. kv: kv,
  116. lease: lease,
  117. localIP: localIp,
  118. }
  119. // 服务注册
  120. go G_register.keepOnline()
  121. return
  122. }