本文内容主要来自:https://darjun.github.io/2020/06/25/godailylib/cron/ cron 库 Github 地址:https://github.com/robfig/cron

快速使用

本文代码使用 Go Modules。

创建目录并初始化:

  1. $ mkdir cron-test && cd cron-test
  2. $ go mod init cron-test

安装cron,目前最新稳定版本为 v3:

  1. $ go get -u github.com/robfig/cron/v3

使用:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/robfig/cron/v3"
  6. )
  7. func main() {
  8. c := cron.New() // 创建 cron 对象,这个对象用于管理定时任务
  9. c.AddFunc("@every 1s", func() { // 添加定时任务
  10. fmt.Println("tick every 1 second")
  11. })
  12. c.Start() // 启动定时器(非阻塞,会创建一个新的 goruntine)
  13. time.Sleep(time.Second * 5)
  14. }

AddFunc()接受两个参数,参数 1 以字符串形式指定触发时间规则,参数 2 是一个无参无返回值的函数,每次触发时调用。@every 1s表示每秒触发一次,@every后加一个时间间隔,表示每隔多长时间触发一次。

注意事项

立即运行任务

cron 不会在添加任务时立即将任务运行一遍。比如,如果时间格式写为@every 10m,那么第一次运行是在 10分钟 后。

如果有立即运行任务的需求,目前只能自己手动启动一个 goroutine 运行任务:

  1. aTask := func() {}
  2. go aTask()
  3. c.AddFunc("@every 10m", aTask)

关于协程安全

cron会创建一个新的 goroutine 来执行触发回调。如果这些回调需要并发访问一些资源、数据,我们需要显式地做同步。

时间格式

类似 crontab 命令的时间格式

与 Linux 中crontab命令相似,cron库支持用 5 个空格分隔的域来表示时间。这 5 个域含义依次为:

  • Minutes:分钟,取值范围[0-59],支持特殊字符* / , -
  • Hours:小时,取值范围[0-23],支持特殊字符* / , -
  • Day of month:每月的第几天,取值范围[1-31],支持特殊字符* / , - ?
  • Month:月,取值范围[1-12]或者使用月份名字缩写[JAN-DEC],支持特殊字符* / , -
  • Day of week:周历,取值范围[0-6]或名字缩写[JUN-SAT],支持特殊字符* / , - ?

注意:月份和周历名称都是不区分大小写的,也就是说SUN/Sun/sun表示同样的含义(都是周日)。

特殊字符含义如下:

  • *:使用*的域可以匹配任何值,例如将月份域(第 4 个)设置为*,表示每个月;
  • /:用来指定范围的步长,例如将小时域(第 2 个)设置为3-59/15表示第 3 分钟触发,以后每隔 15 分钟触发一次,因此第 2 次触发为第 18 分钟,第 3 次为 33 分钟。。。直到分钟大于 59;
  • ,:用来列举一些离散的值和多个范围,例如将周历的域(第 5 个)设置为MON,WED,FRI表示周一、三和五;
  • -:用来表示范围,例如将小时的域(第 1 个)设置为9-17表示上午 9 点到下午 17 点(包括 9 和 17);
  • ?:只能用在月历和周历的域中,用来代替*,表示每月 / 周的任意一天。

了解规则之后,我们可以定义任意时间:

  • 30 * * * *:分钟域为 30,其他域都是*表示任意。每小时的 30 分触发;
  • 30 3-6,20-23 * * *:分钟域为 30,小时域的3-6,20-23表示 3 点到 6 点和 20 点到 23 点。3,4,5,6,20,21,22,23 时的 30 分触发;
  • 0 0 1 1 *:1(第 4 个) 月 1(第 3 个) 号的 0(第 2 个) 时 0(第 1 个) 分触发。

crontab只有 5 个域,只能精确到分钟,而很多时候我们会有精确到秒的需求,因此cron库提供了一个选项WithSeconds

  1. c := cron.New(cron.WithSeconds())
  2. c.AddFunc("30 * * * * *", func() {})

使用WithSeconds选项创建 cron 对象后,就可以用 6 个域的时间格式了,其中第 1 个域表示秒。

预定义的时间规则

为了方便使用,cron预定义了一些时间规则:

  • @yearly:也可以写作@annually,表示每年第一天的 0 点。等价于0 0 1 1 *
  • @monthly:表示每月第一天的 0 点。等价于0 0 1 * *
  • @weekly:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0
  • @daily:也可以写作@midnight,表示每天 0 点。等价于0 0 * * *
  • @hourly:表示每小时的开始。等价于0 * * * *

固定时间间隔

cron支持固定时间间隔,格式为:@every <duration>

含义为每隔duration触发一次。

例如@every 1h表示每小时触发一次,@every 1h30m2s表示每隔 1 小时 30 分 2 秒触发一次。

<duration>会调用time.ParseDuration()函数解析,所以ParseDuration支持的格式都可以。

自定义时间格式

cron支持灵活的时间格式,如果默认的格式不能满足要求,我们可以自己定义时间格式。

时间规则字符串需要用cron.Parser对象来解析。我们先来看看默认的解析器是如何工作的。
首先定义了各个域:

  1. // parser.go
  2. const (
  3. Second ParseOption = 1 << iota
  4. SecondOptional
  5. Minute
  6. Hour
  7. Dom
  8. Month
  9. Dow
  10. DowOptional
  11. Descriptor
  12. )

除了Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)外,还可以支持Second
相对顺序都是固定的:

  1. // parser.go
  2. var places = []ParseOption{
  3. Second,
  4. Minute,
  5. Hour,
  6. Dom,
  7. Month,
  8. Dow,
  9. }
  10. var defaults = []string{
  11. "0",
  12. "0",
  13. "0",
  14. "*",
  15. "*",
  16. "*",
  17. }

默认的时间格式使用 5 个域。

我们可以调用cron.NewParser()创建自己的Parser对象,以位格式传入使用哪些域,例如下面的Parser使用 6 个域,支持Second

  1. parser := cron.NewParser(
  2. cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
  3. )

Descriptor表示对@every/@hour等的支持。
调用cron.WithParser(parser)创建一个选项传入构造函数cron.New(),使用时就可以指定秒了:

  1. c := cron.New(cron.WithParser(parser))
  2. c.AddFunc("1 * * * * *", func () {
  3. fmt.Println("every 1 second")
  4. })
  5. c.Start()

这里时间格式必须使用 6 个域,顺序与上面的const定义一致。

实际上前面提到的WithSeconds就是这样实现的。

  1. // option.go
  2. func WithSeconds() Option {
  3. return WithParser(NewParser(
  4. Second | Minute | Hour | Dom | Month | Dow | Descriptor,
  5. ))
  6. }

定义任务

实现 Job 接口

如果一个结构体实现了Job接口,就可以作为任务传给 cron 对象。

Job接口的定义:

  1. // cron.go
  2. type Job interface {
  3. Run()
  4. }

我们定义一个实现Job接口的结构:

  1. type GreetingJob struct {
  2. Name string
  3. }
  4. func (g GreetingJob) Run() {
  5. fmt.Println("Hello ", g.Name)
  6. }

调用cron对象的AddJob()方法将GreetingJob对象添加到定时管理器中:

  1. func main() {
  2. c := cron.New()
  3. c.AddJob("@every 1s", GreetingJob{"dj"})
  4. c.Start()
  5. time.Sleep(5 * time.Second)
  6. }

使用自定义的结构可以让任务携带状态(Name字段)。

无参函数

另一种定义任务的方式就是使用无参函数。快速使用部分已经提到过。

调用 cron 对象的 AddFunc() 方法可以将无参函数添加到定时管理器中:

  1. job := func() {
  2. fmt.Println("hello")
  3. }
  4. c.AddFunc("@every 1s", job)

实际上AddFunc()方法内部也调用了AddJob()方法。
首先,cron基于func()类型定义一个新的类型FuncJob

  1. // cron.go
  2. type FuncJob func()

然后让FuncJob实现Job接口:

  1. // cron.go
  2. func (f FuncJob) Run() {
  3. f()
  4. }

AddFunc()方法中,将传入的回调转为FuncJob类型,然后调用AddJob()方法:

  1. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
  2. return c.AddJob(spec, FuncJob(cmd))
  3. }

设置时区

默认情况下,所有时间都是基于当前时区的。

我们可以指定时区,有两种方式:

  1. 在时间字符串前面添加一个CRON_TZ= + 具体时区,具体时区的格式在[carbon](https://darjun.github.io/2020/02/14/godailylib/carbon/)的文章中有详细介绍。东京时区为Asia/Tokyo,纽约时区为America/New_York
  2. 创建cron对象时增加一个时区选项cron.WithLocation(location)locationtime.LoadLocation(zone)加载的时区对象,zone为具体的时区格式。或者调用已创建好的cron对象的SetLocation()方法设置时区。

示例:

  1. func main() {
  2. nyc, _ := time.LoadLocation("America/New_York")
  3. c := cron.New(cron.WithLocation(nyc))
  4. c.AddFunc("0 6 * * ?", func() {
  5. fmt.Println("Every 6 o'clock at New York")
  6. })
  7. c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() {
  8. fmt.Println("Every 6 o'clock at Tokyo")
  9. })
  10. c.Start()
  11. for {
  12. time.Sleep(time.Second)
  13. }
  14. }

自定义 LoggerWithLogger

WithLogger可以设置cron内部使用我们自定义的Logger

  1. func main() {
  2. c := cron.New(
  3. cron.WithLogger(
  4. cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))))
  5. c.AddFunc("@every 1s", func() {
  6. fmt.Println("hello world")
  7. })
  8. c.Start()
  9. time.Sleep(5 * time.Second)
  10. }

上面调用cron.VerbosPrintfLogger()包装log.Logger,这个logger会详细记录cron内部的调度过程:

  1. $ go run main.go
  2. cron: 2020/06/26 07:09:14 start
  3. cron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00
  4. cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00
  5. cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00
  6. hello world
  7. cron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00
  8. cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00
  9. hello world
  10. cron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00
  11. cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00
  12. hello world
  13. cron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00
  14. hello world
  15. cron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00
  16. cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00
  17. hello world
  18. cron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0

我们看看默认的Logger是什么样的:

  1. // logger.go
  2. var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
  3. func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  4. return printfLogger{l, false}
  5. }
  6. func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  7. return printfLogger{l, true}
  8. }
  9. type printfLogger struct {
  10. logger interface{ Printf(string, ...interface{}) }
  11. logInfo bool
  12. }

Job 包装器(WithChain

介绍及原理

Job 包装器可以在执行实际的Job前后添加一些逻辑,比如:

  • 捕获panic
  • 如果Job上次运行还未结束,推迟本次执行;
  • 如果Job上次运行还未介绍,跳过本次执行;
  • 记录每个Job的执行情况。

我们可以将Chain类比为 Web 处理器的中间件。实际上就是在Job的执行逻辑外再封装一层逻辑。
我们的封装逻辑需要写成一个函数,传入一个Job类型,返回封装后的Job

cron为这种函数定义了一个类型JobWrapper

  1. // chain.go
  2. type JobWrapper func(Job) Job

然后使用一个Chain对象将这些JobWrapper组合到一起:

  1. type Chain struct {
  2. wrappers []JobWrapper
  3. }
  4. func NewChain(c ...JobWrapper) Chain {
  5. return Chain{c}
  6. }

调用Chain对象的Then(job)方法应用这些JobWrapper,返回最终的Job

  1. func (c Chain) Then(j Job) Job {
  2. for i := range c.wrappers {
  3. j = c.wrappers[len(c.wrappers)-i-1](j)
  4. }
  5. return j
  6. }

注意应用JobWrapper的顺序。

内置 JobWrapper

cron内置了 3 个用得比较多的JobWrapper

  • Recover:捕获内部Job产生的 panic
  • DelayIfStillRunning:触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行;
  • SkipIfStillRunning:触发时,如果上一次任务还未完成,则跳过此次执行。

Recover

先看看如何使用:

  1. type panicJob struct {
  2. count int
  3. }
  4. func (p *panicJob) Run() {
  5. p.count++
  6. if p.count == 1 {
  7. panic("oooooooooooooops!!!")
  8. }
  9. fmt.Println("hello world")
  10. }
  11. func main() {
  12. c := cron.New()
  13. c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{}))
  14. c.Start()
  15. time.Sleep(5 * time.Second)
  16. }

panicJob在第一次触发时,触发了panic。因为有cron.Recover()保护,后续任务还能执行:

  1. $ go run main.go
  2. cron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=...
  3. goroutine 18 [running]:
  4. github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0)
  5. D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xbc
  6. panic(0x4cf380, 0x513280)
  7. C:/Go/src/runtime/panic.go:969 +0x174
  8. main.(*panicJob).Run(0xc0000140e8)
  9. D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xba
  10. github.com/robfig/cron/v3.Recover.func1.1()
  11. D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x6f
  12. github.com/robfig/cron/v3.FuncJob.Run(0xc000070390)
  13. D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x2c
  14. github.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390)
  15. D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x68
  16. created by github.com/robfig/cron/v3.(*Cron).startJob
  17. D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0x7a
  18. hello world
  19. hello world
  20. hello world
  21. hello world

我们看看cron.Recover()的实现,很简单:

  1. // cron.go
  2. func Recover(logger Logger) JobWrapper {
  3. return func(j Job) Job {
  4. return FuncJob(func() {
  5. defer func() {
  6. if r := recover(); r != nil {
  7. const size = 64 << 10
  8. buf := make([]byte, size)
  9. buf = buf[:runtime.Stack(buf, false)]
  10. err, ok := r.(error)
  11. if !ok {
  12. err = fmt.Errorf("%v", r)
  13. }
  14. logger.Error(err, "panic", "stack", "...\n"+string(buf))
  15. }
  16. }()
  17. j.Run()
  18. })
  19. }
  20. }

就是在执行内层的Job逻辑前,添加recover()调用。如果Job.Run()执行过程中有panic。这里的recover()会捕获到,输出调用堆栈。

DelayIfStillRunning

先看如何使用:

  1. type delayJob struct {
  2. count int
  3. }
  4. func (d *delayJob) Run() {
  5. time.Sleep(2 * time.Second)
  6. d.count++
  7. log.Printf("%d: hello world\n", d.count)
  8. }
  9. func main() {
  10. c := cron.New()
  11. c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{}))
  12. c.Start()
  13. time.Sleep(10 * time.Second)
  14. }

上面我们在Run()中增加了一个 2s 的延迟,输出中间隔变为 2s,而不是定时的 1s:

  1. $ go run main.go
  2. 2020/06/27 14:11:16 1: hello world
  3. 2020/06/27 14:11:18 2: hello world
  4. 2020/06/27 14:11:20 3: hello world
  5. 2020/06/27 14:11:22 4: hello world

看看源码:

  1. // chain.go
  2. func DelayIfStillRunning(logger Logger) JobWrapper {
  3. return func(j Job) Job {
  4. var mu sync.Mutex
  5. return FuncJob(func() {
  6. start := time.Now()
  7. mu.Lock()
  8. defer mu.Unlock()
  9. if dur := time.Since(start); dur > time.Minute {
  10. logger.Info("delay", "duration", dur)
  11. }
  12. j.Run()
  13. })
  14. }
  15. }

首先定义一个该任务共用的互斥锁sync.Mutex,每次执行任务前获取锁,执行结束之后释放锁。所以在上一个任务结束前,下一个任务获取锁是无法成功的,从而保证的任务的串行执行。

SkipIfStillRunning

先看如何使用:

  1. type skipJob struct {
  2. count int32
  3. }
  4. func (d *skipJob) Run() {
  5. atomic.AddInt32(&d.count, 1)
  6. log.Printf("%d: hello world\n", d.count)
  7. if atomic.LoadInt32(&d.count) == 1 {
  8. time.Sleep(2 * time.Second)
  9. }
  10. }
  11. func main() {
  12. c := cron.New()
  13. c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{}))
  14. c.Start()
  15. time.Sleep(10 * time.Second)
  16. }

输出:

  1. $ go run main.go
  2. 2020/06/27 14:22:07 1: hello world
  3. 2020/06/27 14:22:10 2: hello world
  4. 2020/06/27 14:22:11 3: hello world
  5. 2020/06/27 14:22:12 4: hello world
  6. 2020/06/27 14:22:13 5: hello world
  7. 2020/06/27 14:22:14 6: hello world
  8. 2020/06/27 14:22:15 7: hello world
  9. 2020/06/27 14:22:16 8: hello world

注意观察时间,第一个与第二个输出之间相差 3s,因为跳过了两次执行。

注意DelayIfStillRunningSkipIfStillRunning是有本质上的区别的,前者DelayIfStillRunning只要时间足够长,所有的任务都会按部就班地完成,只是可能前一个任务耗时过长,导致后一个任务的执行时间推迟了一点。而SkipIfStillRunning会跳过一些执行。

看看源码:

  1. func SkipIfStillRunning(logger Logger) JobWrapper {
  2. return func(j Job) Job {
  3. var ch = make(chan struct{}, 1)
  4. ch <- struct{}{}
  5. return FuncJob(func() {
  6. select {
  7. case v := <-ch:
  8. j.Run()
  9. ch <- v
  10. default:
  11. logger.Info("skip")
  12. }
  13. })
  14. }
  15. }

定义一个该任务共用的缓存大小为 1 的通道chan struct{}。执行任务时,从通道中取值,如果成功,执行,否则跳过。执行完成之后再向通道中发送一个值,确保下一个任务能执行。初始发送一个值到通道中,保证第一个任务的执行。