- 从etcd中把job同步到内存
- 实现调度模块, 基于cron表达式调度N个job
- 实现执行模块, 并发执行多个job
- 对job的分布式锁, 防止集群并发
- 把执行的日志保存到mongodb
监听任务的变化
func InitJobMgr() error { config := clientv3.Config{ Endpoints: G_config.EtcdEndpoints, DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, } client, err := clientv3.New(config) if err != nil { return err } G_jobMgr = &JobMgr{ client: client, kv: clientv3.NewKV(client), lease: clientv3.NewLease(client), watcher: clientv3.NewWatcher(client), } // 启动任务监听 G_jobMgr.watchJobs() return nil}
// 监听任务变化func (jobMgr *JobMgr) watchJobs() error { // 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision // 2. 从该revision向后监听变化事件 response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()) if err != nil { return err } // 当前有哪些任务 for _, kv := range response.Kvs { job, err := common.UnpackJob(kv.Value) if err != nil { return err } // job同步到调度协程(scheduler) log.Println(`当前任务: `, job.Name) common.BuildJobEvent(common.JOB_EVENT_SAVE, job) } // 任务变化后 从该revision向后监听变化事件 go func() { revision := response.Header.GetRevision() + 1 // 监听/cron/jobs/目录后续变化, 返回chan watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix()) // 处理监听事件 for watchResponse := range watchChan { for _, event := range watchResponse.Events { switch event.Type { case mvccpb.PUT: // 保存任务事件 // 推送更新事件给 scheduler if job, err := common.UnpackJob(event.Kv.Value); err != nil { log.Println(`common.UnpackJob err: `, err) continue } else { log.Println(`获取到保存任务事件: `, job.Name) // 构建一个更新event事件 common.BuildJobEvent(common.JOB_EVENT_SAVE, job) } case mvccpb.DELETE: // 任务被删除了 // 推送删除事件给 scheduler jobName := common.ExtractJobName(string(event.Kv.Key)) log.Println(`获取到删除任务事件: `, jobName) // 构建一个删除event事件 common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{ Name: jobName, }) } } } }() return nil}
package workerimport ( "context" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "golang-crontab/common" "log" "time")type JobMgr struct { client *clientv3.Client kv clientv3.KV lease clientv3.Lease watcher clientv3.Watcher}var G_jobMgr *JobMgrfunc InitJobMgr() error { config := clientv3.Config{ Endpoints: G_config.EtcdEndpoints, DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, } client, err := clientv3.New(config) if err != nil { return err } G_jobMgr = &JobMgr{ client: client, kv: clientv3.NewKV(client), lease: clientv3.NewLease(client), watcher: clientv3.NewWatcher(client), } // 启动任务监听 err = G_jobMgr.watchJobs() // 启动监听killer G_jobMgr.watchKiller() return err}// 监听强杀任务通知func (jobMgr *JobMgr) watchKiller() { var ( watchChan clientv3.WatchChan watchResp clientv3.WatchResponse watchEvent *clientv3.Event jobEvent *common.JobEvent jobName string job *common.Job ) // 监听/cron/killer目录 go func() { // 监听协程 // 监听/cron/killer/目录的变化 watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix()) // 处理监听事件 for watchResp = range watchChan { for _, watchEvent = range watchResp.Events { switch watchEvent.Type { case mvccpb.PUT: // 杀死任务事件 jobName = common.ExtractKillerName(string(watchEvent.Kv.Key)) job = &common.Job{Name: jobName} jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job) // 事件推给scheduler G_scheduler.PushJobEvent(jobEvent) case mvccpb.DELETE: // killer标记过期, 被自动删除 } } } }()}// 监听任务变化func (jobMgr *JobMgr) watchJobs() error { // 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision // 2. 从该revision向后监听变化事件 response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()) if err != nil { return err } // 当前有哪些任务 for _, kv := range response.Kvs { job, err := common.UnpackJob(kv.Value) if err != nil { return err } // job同步到调度协程(scheduler) log.Println(`当前存在的任务: `, job.Name) jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job) G_scheduler.PushJobEvent(jobEvent) } // 任务变化后 从该revision向后监听变化事件 go func() { revision := response.Header.GetRevision() + 1 // 监听/cron/jobs/目录后续变化, 返回chan watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix()) // 处理监听事件 for watchResponse := range watchChan { for _, event := range watchResponse.Events { switch event.Type { case mvccpb.PUT: // 保存任务事件 // 推送更新事件给 scheduler if job, err := common.UnpackJob(event.Kv.Value); err != nil { log.Println(`common.UnpackJob err: `, err) continue } else { log.Println(`获取到保存任务事件: `, job.Name) // 构建一个更新event事件 jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job) G_scheduler.PushJobEvent(jobEvent) } case mvccpb.DELETE: // 任务被删除了 // 推送删除事件给 scheduler jobName := common.ExtractJobName(string(event.Kv.Key)) log.Println(`获取到删除任务事件: `, jobName) // 构建一个删除event事件 jobEvent := common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{ Name: jobName, }) G_scheduler.PushJobEvent(jobEvent) } } } }() return nil}// 创建任务执行锁func (jobMgr *JobMgr) CreateJobLock(jobName string) (jobLock *JobLock) { jobLock = InitJobLock(jobName, jobMgr.kv, jobMgr.lease) return}
任务发送给scheduler
package workerimport ( "fmt" "golang-crontab/common" "log" "time")// 任务调度type Scheduler struct { jobEventChan chan *common.JobEvent jobPlanTable map[string]*common.JobSchedulePlan jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表 jobResultChan chan *common.JobExecuteResult // 任务结果队列}var G_scheduler *Schedulerfunc InitScheduler() { G_scheduler = &Scheduler{ jobEventChan: make(chan *common.JobEvent, 1000), jobPlanTable: make(map[string]*common.JobSchedulePlan), jobExecutingTable: make(map[string]*common.JobExecuteInfo), jobResultChan: make(chan *common.JobExecuteResult, 1000), } go G_scheduler.scheduleLoop()}// 重新计算任务调度状态func (s *Scheduler) TryScheduler() (schedulerAfter time.Duration) { // 1. 遍历所有任务 // 2. 过期任务立即执行 // 3. 统计最近要过期的任务 // 如果任务表为空话,随便睡眠多久 if len(s.jobPlanTable) == 0 { schedulerAfter = 1 * time.Second return } now := time.Now() var nearTime *time.Time for _, plan := range s.jobPlanTable { if plan.NextTime.Before(now) || plan.NextTime.Equal(now) { // 尝试执行任务 //fmt.Println("执行任务: ", time.Now().Format(`2006-01-02 15:04:05`), plan.Job.Name) s.TryStartJob(plan) plan.NextTime = plan.Expr.Next(now) // 更新下次执行时间 } // 统计最近要过期的任务 if nearTime == nil || plan.NextTime.Before(*nearTime) { nearTime = &plan.NextTime } } if nearTime != nil { schedulerAfter = (*nearTime).Sub(time.Now()) } return}// 尝试执行任务func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) { // 调度 和 执行 是2件事情 var ( jobExecuteInfo *common.JobExecuteInfo jobExecuting bool ) // 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发! // 如果任务正在执行,跳过本次调度 if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobPlan.Job.Name]; jobExecuting { // fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name) return } // 构建执行状态信息 jobExecuteInfo = common.BuildJobExecuteInfo(jobPlan) // 保存执行状态 scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo // 执行任务 fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime) G_executor.ExecuteJob(jobExecuteInfo)}// 调度协程func (s *Scheduler) scheduleLoop() { schedulerAfter := s.TryScheduler() schedulerTimer := time.NewTimer(schedulerAfter) for { select { case jobEvent := <-s.jobEventChan: // 获取到任务变化事件 s.handleJobEvent(jobEvent) case <-schedulerTimer.C: // 最近的任务到期了 case jobResult := <-s.jobResultChan: s.handleJobResult(jobResult) } // 调度一次任务 schedulerAfter = s.TryScheduler() // 重置调度间隔 schedulerTimer.Reset(schedulerAfter) }}// 向scheduler推送任务变化事件func (s *Scheduler) PushJobEvent(jobEvent *common.JobEvent) { s.jobEventChan <- jobEvent}// 处理任务事件func (s *Scheduler) handleJobEvent(event *common.JobEvent) { switch event.EventType { case common.JOB_EVENT_SAVE: plan, err := common.BuildJobSchedulePlan(event.Job) if err != nil { log.Println(`BuildJobSchedulePlanL err: `, err) } s.jobPlanTable[event.Job.Name] = plan case common.JOB_EVENT_DELETE: if _, ok := s.jobPlanTable[event.Job.Name]; ok { delete(s.jobPlanTable, event.Job.Name) } case common.JOB_EVENT_KILL: // 取消掉Command执行, 判断任务是否在执行中 if jobExecuteInfo, exist := s.jobExecutingTable[event.Job.Name]; exist { jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出 } }}// 回传任务执行结果func (s *Scheduler) PushJobResult(jobResult *common.JobExecuteResult) { s.jobResultChan <- jobResult}// 处理任务结果func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) { // 删除执行状态 delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name) fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err) var ( jobLog *common.JobLog ) // 删除执行状态 delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name) // 生成执行日志 if result.Err != common.ERR_LOCK_ALREADY_REQUIRED { jobLog = &common.JobLog{ JobName: result.ExecuteInfo.Job.Name, Command: result.ExecuteInfo.Job.Command, Output: string(result.Output), PlanTime: result.ExecuteInfo.PlanTime.UnixNano() / 1000 / 1000, ScheduleTime: result.ExecuteInfo.RealTime.UnixNano() / 1000 / 1000, StartTime: result.StartTime.UnixNano() / 1000 / 1000, EndTime: result.EndTime.UnixNano() / 1000 / 1000, } if result.Err != nil { jobLog.Err = result.Err.Error() } else { jobLog.Err = "" } G_logSink.Append(jobLog) }}
任务执行器
package workerimport ( "golang-crontab/common" "log" "math/rand" "os/exec" "time")// 任务执行器type Executor struct {}var ( G_executor *Executor)// 初始化执行器func InitExecutor() (err error) { G_executor = &Executor{} return}// 执行一个任务func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) { //log.Println(`ExecuteJob: `, info.Job.Command) go func() { var ( cmd *exec.Cmd err error output []byte result *common.JobExecuteResult jobLock *JobLock ) // 任务结果 result = &common.JobExecuteResult{ ExecuteInfo: info, Output: make([]byte, 0), } // 初始化分布式锁 jobLock = G_jobMgr.CreateJobLock(info.Job.Name) // 记录任务开始时间 result.StartTime = time.Now() // 上锁 // 随机睡眠(0~1s) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) err = jobLock.TryLock() defer jobLock.Unlock() if err != nil { // 上锁失败 log.Println(`上锁失败, err: `, err) result.Err = err result.EndTime = time.Now() } else { // 上锁成功后,重置任务启动时间 result.StartTime = time.Now() // 执行shell命令 cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command) // 执行并捕获输出 output, err = cmd.CombinedOutput() // 记录任务结束时间 result.EndTime = time.Now() result.Output = output result.Err = err } // 任务执行完成后,把执行的结果返回给Scheduler,Scheduler会从executingTable中删除掉执行记录 G_scheduler.PushJobResult(result) }()}
分布式锁
package workerimport ( "context" clientv3 "go.etcd.io/etcd/client/v3" "golang-crontab/common")// 分布式锁(TXN事务)type JobLock struct { // etcd客户端 kv clientv3.KV lease clientv3.Lease jobName string // 任务名 cancelFunc context.CancelFunc // 用于终止自动续租 leaseId clientv3.LeaseID // 租约ID isLocked bool // 是否上锁成功}// 初始化一把锁func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) { jobLock = &JobLock{ kv: kv, lease: lease, jobName: jobName, } return}// 尝试上锁func (jobLock *JobLock) TryLock() (err error) { var ( leaseGrantResp *clientv3.LeaseGrantResponse cancelCtx context.Context cancelFunc context.CancelFunc leaseId clientv3.LeaseID keepRespChan <-chan *clientv3.LeaseKeepAliveResponse txn clientv3.Txn lockKey string txnResp *clientv3.TxnResponse ) // 1, 创建租约(5秒) if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil { return } // context用于取消自动续租 cancelCtx, cancelFunc = context.WithCancel(context.TODO()) // 租约ID leaseId = leaseGrantResp.ID // 2, 自动续租 if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil { goto FAIL } // 3, 处理续租应答的协程 go func() { var ( keepResp *clientv3.LeaseKeepAliveResponse ) for { select { case keepResp = <-keepRespChan: // 自动续租的应答 if keepResp == nil { goto END } } } END: }() // 4, 创建事务txn txn = jobLock.kv.Txn(context.TODO()) // 锁路径 lockKey = common.JOB_LOCK_DIR + jobLock.jobName // 5, 事务抢锁 txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))). Else(clientv3.OpGet(lockKey)) // 提交事务 if txnResp, err = txn.Commit(); err != nil { goto FAIL } // 6, 成功返回, 失败释放租约 if !txnResp.Succeeded { // 锁被占用 err = common.ERR_LOCK_ALREADY_REQUIRED goto FAIL } // 抢锁成功 jobLock.leaseId = leaseId jobLock.cancelFunc = cancelFunc jobLock.isLocked = true returnFAIL: cancelFunc() // 取消自动续租 jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约 return}// 释放锁func (jobLock *JobLock) Unlock() { if jobLock.isLocked { jobLock.cancelFunc() // 取消我们程序自动续租的协程 jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约 }}
执行日志保存到mongodb
package workerimport ( "context" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "golang-crontab/common" "time")// mongodb存储日志type LogSink struct { client *mongo.Client logCollection *mongo.Collection logChan chan *common.JobLog autoCommitChan chan *common.LogBatch}var ( // 单例 G_logSink *LogSink)// 批量写入日志func (logSink *LogSink) saveLogs(batch *common.LogBatch) { logSink.logCollection.InsertMany(context.TODO(), batch.Logs)}// 日志存储协程func (logSink *LogSink) writeLoop() { var ( log *common.JobLog logBatch *common.LogBatch // 当前的批次 commitTimer *time.Timer timeoutBatch *common.LogBatch // 超时批次 ) for { select { case log = <-logSink.logChan: if logBatch == nil { logBatch = &common.LogBatch{} // 让这个批次超时自动提交(给1秒的时间) commitTimer = time.AfterFunc( time.Duration(G_config.JobLogCommitTimeout)*time.Millisecond, func(batch *common.LogBatch) func() { return func() { logSink.autoCommitChan <- batch } }(logBatch), ) } // 把新日志追加到批次中 logBatch.Logs = append(logBatch.Logs, log) // 如果批次满了, 就立即发送 if len(logBatch.Logs) >= G_config.JobLogBatchSize { // 发送日志 logSink.saveLogs(logBatch) // 清空logBatch logBatch = nil // 取消定时器 commitTimer.Stop() } case timeoutBatch = <-logSink.autoCommitChan: // 过期的批次 // 判断过期批次是否仍旧是当前的批次 if timeoutBatch != logBatch { continue // 跳过已经被提交的批次 } // 把批次写入到mongo中 logSink.saveLogs(timeoutBatch) // 清空logBatch logBatch = nil } }}func InitLogSink() (err error) { // 建立mongodb连接 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017")) if err != nil { return err } // 选择db和collection G_logSink = &LogSink{ client: client, logCollection: client.Database("cron").Collection("log"), logChan: make(chan *common.JobLog, 1000), autoCommitChan: make(chan *common.LogBatch, 1000), } // 启动一个mongodb处理协程 go G_logSink.writeLoop() return}// 发送日志func (logSink *LogSink) Append(jobLog *common.JobLog) { select { case logSink.logChan <- jobLog: default: // 队列满了就丢弃 }}
worker 服务注册
package workerimport ( "context" clientv3 "go.etcd.io/etcd/client/v3" "golang-crontab/common" "net" "time")// 注册节点到etcd: /cron/workers/IP地址type Register struct { client *clientv3.Client kv clientv3.KV lease clientv3.Lease localIP string // 本机IP}var ( G_register *Register)// 获取本机网卡IPfunc getLocalIP() (ipv4 string, err error) { var ( addrs []net.Addr addr net.Addr ipNet *net.IPNet // IP地址 isIpNet bool ) // 获取所有网卡 if addrs, err = net.InterfaceAddrs(); err != nil { return } // 取第一个非lo的网卡IP for _, addr = range addrs { // 这个网络地址是IP地址: ipv4, ipv6 if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() { // 跳过IPV6 if ipNet.IP.To4() != nil { ipv4 = ipNet.IP.String() // 192.168.1.1 return } } } err = common.ERR_NO_LOCAL_IP_FOUND return}// 注册到/cron/workers/IP, 并自动续租func (register *Register) keepOnline() { var ( regKey string leaseGrantResp *clientv3.LeaseGrantResponse err error keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse keepAliveResp *clientv3.LeaseKeepAliveResponse cancelCtx context.Context cancelFunc context.CancelFunc ) for { // 注册路径 regKey = common.JOB_WORKER_DIR + register.localIP cancelFunc = nil // 创建租约 if leaseGrantResp, err = register.lease.Grant(context.TODO(), 10); err != nil { goto RETRY } // 自动续租 if keepAliveChan, err = register.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil { goto RETRY } cancelCtx, cancelFunc = context.WithCancel(context.TODO()) // 注册到etcd if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil { goto RETRY } // 处理续租应答 for { select { case keepAliveResp = <-keepAliveChan: if keepAliveResp == nil { // 续租失败 goto RETRY } } } RETRY: time.Sleep(1 * time.Second) if cancelFunc != nil { cancelFunc() } }}func InitRegister() (err error) { var ( config clientv3.Config client *clientv3.Client kv clientv3.KV lease clientv3.Lease localIp string ) // 初始化配置 config = clientv3.Config{ Endpoints: G_config.EtcdEndpoints, // 集群地址 DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时 } // 建立连接 if client, err = clientv3.New(config); err != nil { return } // 本机IP if localIp, err = getLocalIP(); err != nil { return } // 得到KV和Lease的API子集 kv = clientv3.NewKV(client) lease = clientv3.NewLease(client) G_register = &Register{ client: client, kv: kv, lease: lease, localIP: localIp, } // 服务注册 go G_register.keepOnline() return}