使用 timer 定时
执行命令怎么做到计时执行这个事情呢?Golang 有个标准库 time,里面提供一个计时器 timer 的结构,是否可以使用这个 timer 来执行呢?我们先来看看 timer 是怎么使用的:
func main() {timer := time.NewTimer(3 * time.Second) // 定一个计时器,3s后触发select {now <-timer.C: // 监听计时器中的事件fmt.Println("3秒执行任务, 现在时间", now) //3s后执行}}
如果有一堆定时任务,要怎么定时执行呢?
// Entity代表每个定时任务entries := []Entry// 计算每个定时任务时间for _, entry := range entries {entry.Next = next(entry)}for {// 根据Next时间排序sortByTime(entries)// 创建计时器timer = time.NewTimer(entries.Early.Sub(time.Now()))select {case now = <-timer.C:for _, entry := entries {// 对已经到了时间的任务,执行if entry.Next.Ok() {go startJob(entry)}// 所有任务重新计算下个timerentry.Next = next(entry)}}}
使用 cron 包定时执行命令
// 创建一个cron实例c := cron.New()// 每整点30分钟执行一次c.AddFunc("30 * * * *", func() {fmt.Println("Every hour on the half hour")})// 上午3-6点,下午8-11点的30分钟执行c.AddFunc("30 3-6,20-23 * * *", func() {fmt.Println(".. in the range 3-6am, 8-11pm")})// 东京时间4:30执行一次c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() {fmt.Println("Runs at 04:30 Tokyo time every day")})// 从现在开始每小时执行一次c.AddFunc("@hourly", func() {fmt.Println("Every hour, starting an hour from now")})// 从现在开始,每一个半小时执行一次c.AddFunc("@every 1h30m", func() {fmt.Println("Every hour thirty, starting an hour thirty from now")})// 启动cronc.Start()...// 在cron运行过程中增加任务c.AddFunc("@daily", func() { fmt.Println("Every day") })..// 查看运行中的任务inspect(c.Entries())..// 停止cron的运行,优雅停止,所有正在运行中的任务不会停止。c.Stop()
SpecSchedule包秒级别的定时
// 创建一个cron实例c := cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))// 每秒执行一次c.AddFunc("* * * * * *", func() {fmt.Println("Every hour on the half hour")})
在 cron 库中,增加一个定时任务的 AddFunc 方法,有两个参数:时间描述语言、匿名函数。那么明显,AddCronCommand 函数中核心要做的,就是将 Command 结构的执行封装成一个匿名函数,再调用 cron 的 AddFunc 方法就可以了。
因为在 Golang 中,fork 可以启动一个子进程,但是这个子进程是无法继承父进程的调度模型的。Golang 的调度模型是在用户态的 runtime 中自己进行调度的,而系统调用 fork 出来的子进程默认只会有单线程。所以在 Golang 中尽量不要使用 fork 的方式来复制启动当前进程。
另一个办法是使用 os.StartProcess 来启动一个进程,执行当前进程相同的二进制文件以及当前进程相同的参数。
https://github.com/sevlyar/go-daemon
https://github.com/sevlyar/retag
如何实现分布式定时器
先定义接口,这个接口的功能是一个分布式的选择器,当有很多节点要执行某个服务的时候,只选择出其中一个节点。这样不管底层是否用 Redis 实现分布式选择器,在业务层我们都可以不用关心。
package contractimport "time"// DistributedKey 定义字符串凭证const DistributedKey = "hade:distributed"// Distributed 分布式服务type Distributed interface {// Select 分布式选择器, 所有节点对某个服务进行抢占,只选择其中一个节点// ServiceName 服务名字// appID 当前的AppID// holdTime 分布式选择器hold住的时间// 返回值// selectAppID 分布式选择器最终选择的App// err 异常才返回,如果没有被选择,不返回errSelect(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error)}
这个具体实现就有很多方式了,Redis 只是其中一种而已,而且 Redis 要到后面章节才能引入。这里就实现一个本地文件锁。当一个服务器上有多个进程需要进行抢锁操作,文件锁是一种单机多进程抢占的很简易的实现方式。在 Golang 中,其使用方法也比较简单。
多个进程同时使用 os.OpenFile 打开一个文件,并使用 syscall.Flock 带上 syscall.LOCK_EX 参数来对这个文件加文件锁,这里只会有一个进程抢占到文件锁,而其他抢占不到的进程从 syscall.Flock 函数中获取到的就是 error。根据这个 error 是否为空,我们就能判断是否抢占到了文件锁。
释放文件锁有两种方式,一种方式是调用 syscall.Flock 带上 syscall.LOCK_UN 的参数,另外一种方式是抢占到锁的进程结束,也会自动释放文件锁。
本地文件锁的具体实现
// Select 为分布式选择器func (s LocalDistributedService) Select(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error) {appService := s.container.MustMake(contract.AppKey).(contract.App)runtimeFolder := appService.RuntimeFolder()lockFile := filepath.Join(runtimeFolder, "disribute_"+serviceName)// 打开文件锁lock, err := os.OpenFile(lockFile, os.O_RDWR|os.O_CREATE, 0666)if err != nil {return "", err}// 尝试独占文件锁err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)// 抢不到文件锁if err != nil {// 读取被选择的appidselectAppIDByt, err := ioutil.ReadAll(lock)if err != nil {return "", err}return string(selectAppIDByt), err}// 在一段时间内,选举有效,其他节点在这段时间不能再进行抢占go func() {defer func() {// 释放文件锁syscall.Flock(int(lock.Fd()), syscall.LOCK_UN)// 释放文件lock.Close()// 删除文件锁对应的文件os.Remove(lockFile)}()// 创建选举结果有效的计时器timer := time.NewTimer(holdTime)// 等待计时器结束<-timer.C}()// 这里已经是抢占到了,将抢占到的appID写入文件if _, err := lock.WriteString(appID); err != nil {return "", err}return appID, nil}
分布式选举
// AddDistributedCronCommand 实现一个分布式定时器// serviceName 这个服务的唯一名字,不允许带有空格// spec 具体的执行时间// cmd 具体的执行命令// holdTime 表示如果我选择上了,这次选择持续的时间,也就是锁释放的时间func (c *Command) AddDistributedCronCommand(serviceName string, spec string, cmd *Command, holdTime time.Duration) {root := c.Root()// 初始化cronif root.Cron == nil {root.Cron = cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))root.CronSpecs = []CronSpec{}}// cron命令的注释,这里注意Type为distributed-cron,ServiceName需要填写root.CronSpecs = append(root.CronSpecs, CronSpec{Type: "distributed-cron",Cmd: cmd,Spec: spec,ServiceName: serviceName,})appService := root.GetContainer().MustMake(contract.AppKey).(contract.App)distributeServce := root.GetContainer().MustMake(contract.DistributedKey).(contract.Distributed)appID := appService.AppID()// 复制要执行的command为cronCmd,并且设置为rootCmdvar cronCmd Commandctx := root.Context()cronCmd = *cmdcronCmd.args = []string{}cronCmd.SetParentNull()// cron增加匿名函数root.Cron.AddFunc(spec, func() {// 防止panicdefer func() {if err := recover(); err != nil {log.Println(err)}}()// 节点进行选举,返回选举结果selectedAppID, err := distributeServce.Select(serviceName, appID, holdTime)if err != nil {return}// 如果自己没有被选择到,直接返回if selectedAppID != appID {return}// 如果自己被选择到了,执行这个定时任务err = cronCmd.ExecuteContext(ctx)if err != nil {log.Println(err)}})}
问答:
一个节点是一台服务器还是运行一次程序就是一个节点,多个节点怎么?
多个节点即可以是一个服务器一个,也可以是多个进程。不过一般现在容器化这么普遍的情况,基本上一个节点在实际上上一个pod的概念
问答:
linux cron 命令会把任务写入 crontab 文件,而这里的代码似乎没有存储cron任务,所以cron任务其实是写死在app里并从命令行启动的?
这样似乎会带来一些额外的开发和运维工作,比如需要新增和修改cron任务时就需要升级app并重新部署。 在分布式调度cron任务时还会有个问题,在同一个环境中启动多个app实例去抢占执行cron任务的话就只会记下最后一个pid,那么那些管理cron任务的子命令就都失效了。所以分布式调度必然意味着需要分布式部署?
第一个问题,是的,cron是写在app里面运行的,每次升级cron需要升级app。第二个问题是存在这样问题,这个之前没考虑到,同一个目录启动多个实例。标准办法应该是每个实例启动一个runtimefolder,把runtimefolder从环境变量中传递进去才行
