简介

gron是一个比较小巧、灵活的定时任务库,可以执行定时的、周期性的任务。gron提供简洁的、并发安全的接口。我们先介绍gron库的使用,然后简单分析一下源码。

快速使用

先安装:

  1. $ go get github.com/roylee0704/gron

后使用:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/roylee0704/gron"
  7. )
  8. func main() {
  9. var wg sync.WaitGroup
  10. wg.Add(1)
  11. c := gron.New()
  12. c.AddFunc(gron.Every(5*time.Second), func() {
  13. fmt.Println("runs every 5 seconds.")
  14. })
  15. c.Start()
  16. wg.Wait()
  17. }

gron的使用比较简单:

  • 首先调用gron.New()创建一个管理器,这是一个定时任务的管理器;
  • 然后调用管理器的AddFunc()Add()方法向它添加任务,在启动时添加也是可以的,见下文分析;
  • 最后调用管理器的Start()方法启动它。

gron支持两种添加任务的方式,一种是使用无参数的函数,另一种是实现任务接口。上面例子中使用的是前一种方式,实现接口的方式我们后面会介绍。添加任务时通过gron.Every()指定周期任务的间隔,上面添加了一个 5s 的周期任务,每隔 5s 输出一行文字。

需要注意的是,我们使用sync.WaitGroup保证主 goroutine 不退出。因为c.Start()中只是启动了一个 goroutine,如果主 goroutine 退出了,整个程序就停止了。

运行程序,每隔 5s 输出:

  1. runs every 5 seconds.
  2. runs every 5 seconds.
  3. runs every 5 seconds.

该程序需要按下ctrl + c停止!

时间格式

gron接受time.Duration类型的时间间隔,除了time包中定义的基础Second/Minute/Hourgron中的xtime子包还提供了Day/Week单位的时间。有一点需要注意,gron支持的时间精度为 1s,小于 1s 的间隔是不支持的。除了单位时间间隔,我们还可以使用4m10s这样的时间:

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(1)
  4. c := gron.New()
  5. c.AddFunc(gron.Every(1*time.Second), func() {
  6. fmt.Println("runs every second.")
  7. })
  8. c.AddFunc(gron.Every(1*time.Minute), func() {
  9. fmt.Println("runs every minute.")
  10. })
  11. c.AddFunc(gron.Every(1*time.Hour), func() {
  12. fmt.Println("runs every hour.")
  13. })
  14. c.AddFunc(gron.Every(1*xtime.Day), func() {
  15. fmt.Println("runs every day.")
  16. })
  17. c.AddFunc(gron.Every(1*xtime.Week), func() {
  18. fmt.Println("runs every week.")
  19. })
  20. t, _ := time.ParseDuration("4m10s")
  21. c.AddFunc(gron.Every(t), func() {
  22. fmt.Println("runs every 4 minutes 10 seconds.")
  23. })
  24. c.Start()
  25. wg.Wait()
  26. }

通过gron.Every()设置每隔多长时间执行一次任务。对于大于 1 天的时间间隔,我们还可以使用gron.Every().At()指定其在某个时间点执行。例如下面的程序,从第二天的22:00开始,每隔一天触发一次,即每天的22:00触发:

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(1)
  4. c := gron.New()
  5. c.AddFunc(gron.Every(1*xtime.Day).At("22:00"), func() {
  6. fmt.Println("runs every second.")
  7. })
  8. c.Start()
  9. wg.Wait()
  10. }

自定义任务

实现自定义任务也很简单,只需要实现gron.Job接口即可:

  1. // src/github.com/roylee0704/gron/cron.go
  2. type Job interface {
  3. Run()
  4. }

我们需要调用调度器的Add()方法向管理器添加自定义任务:

  1. type GreetingJob struct {
  2. Name string
  3. }
  4. func (g GreetingJob) Run() {
  5. fmt.Println("Hello ", g.Name)
  6. }
  7. func main() {
  8. var wg sync.WaitGroup
  9. wg.Add(1)
  10. g1 := GreetingJob{Name: "dj"}
  11. g2 := GreetingJob{Name: "dajun"}
  12. c := gron.New()
  13. c.Add(gron.Every(5*time.Second), g1)
  14. c.Add(gron.Every(10*time.Second), g2)
  15. c.Start()
  16. wg.Wait()
  17. }

上面我们编写了一个GreetingJob结构,实现gron.Job接口,然后创建两个对象g1/g2,一个 5s 触发一次,一个 10s 触发一次。使用自定义任务的方式可以比较好地处理携带状态的任务,如上面的Name字段。

实际上,AddFunc()方法内部也是通过Add()实现的:

  1. // src/github.com/roylee0704/gron/cron.go
  2. func (c *Cron) AddFunc(s Schedule, j func()) {
  3. c.Add(s, JobFunc(j))
  4. }
  5. type JobFunc func()
  6. func (j JobFunc) Run() {
  7. j()
  8. }

AddFunc()内部,将传入的函数转为JobFunc类型,而gronJobFunc实现了gron.Job接口。是不是与net/http包中的HandleFuncHandle很像。如果注意观察的话,在很多 Go 语言的代码中都有此类模式。

一点源码

gron的源码只有两个文件cron.goschedule.gocron.go中实现添加任务和调度的方法,schedule.go中是时间策略相关的代码。两个文件算上注释一共才 260 行!我们添加的任务在gron内部都是以Entry结构表示的:

  1. type Entry struct {
  2. Schedule Schedule
  3. Job Job
  4. Next time.Time
  5. Prev time.Time
  6. }

Next为下次执行时间,Prev为上次执行时间,Job是要执行的任务,Schedulegron.Schedule接口类型,调用其Next()可计算出下次执行的时间点。

管理器使用gron.Cron结构表示:

  1. type Cron struct {
  2. entries []*Entry
  3. running bool
  4. add chan *Entry
  5. stop chan struct{}
  6. }

任务的调度在另外一个 goroutine 中。如果调度未开始,添加任务可直接appendentries切片中;如果调度已开始(Start()方法已调用),需要向通道add发送待添加的任务。任务调度的核心逻辑在Run()方法中:

  1. func (c *Cron) run() {
  2. var effective time.Time
  3. now := time.Now().Local()
  4. // to figure next trig time for entries, referenced from now
  5. for _, e := range c.entries {
  6. e.Next = e.Schedule.Next(now)
  7. }
  8. for {
  9. sort.Sort(byTime(c.entries))
  10. if len(c.entries) > 0 {
  11. effective = c.entries[0].Next
  12. } else {
  13. effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
  14. }
  15. select {
  16. case now = <-after(effective.Sub(now)):
  17. // entries with same time gets run.
  18. for _, entry := range c.entries {
  19. if entry.Next != effective {
  20. break
  21. }
  22. entry.Prev = now
  23. entry.Next = entry.Schedule.Next(now)
  24. go entry.Job.Run()
  25. }
  26. case e := <-c.add:
  27. e.Next = e.Schedule.Next(time.Now())
  28. c.entries = append(c.entries, e)
  29. case <-c.stop:
  30. return // terminate go-routine.
  31. }
  32. }
  33. }

执行流程如下:

  1. 调度器刚启动时,先计算所有任务的下次执行时间;
  2. 然后在一个for循环中,按照执行时间从早到晚排序,取出最近需要执行任务的时间点;
  3. select语句中等待到这个时间点,启动新的 goroutine 执行到期的任务,每个任务一个新的 goroutine;
  4. 如果在等待的过程中,又添加了新的任务(通过通道c.add),计算这个新任务的首次执行时间。跳到步骤 2,因为新添加的任务可能最早执行。

有几个细节需要注意一下:

  1. 任务到期判断使用的是本地时间:time.Now().Local()
  2. 如果没有任务,等待时间设置为now.AddDate(15, 0, 0),即 15 年,防止 CPU 空转;
  3. 任务都是在独立的 goroutine 中执行的;
  4. 通过实现sort.Interface接口可以实现自定义排序(代码中的byTime)。

最后,我们来看一下时间策略的代码。我们知道在Entry结构中存储了一个gron.Schedule类型的对象,调用该对象的Next()方法返回下次执行的时间点:

  1. // src/github.com/roylee0704/gron/schedule.go
  2. type Schedule interface {
  3. Next(t time.Time) time.Time
  4. }

gron内置实现了两种Schedule,一种是periodicSchedule,即周期触发gron.Every()函数返回的就是这个对象:

  1. // src/github.com/roylee0704/gron/schedule.go
  2. type periodicSchedule struct {
  3. period time.Duration
  4. }

一种是固定时刻的周期触发,它实际上也是周期触发,只是固定了时间点:

  1. type atSchedule struct {
  2. period time.Duration
  3. hh int
  4. mm int
  5. }

他们的核心逻辑在Next()方法中,periodicSchedule只需要用当前时间加上周期即可得到下次触发时间。这里Truncate()方法截掉了当前时间中小于 1s 的部分:

  1. func (ps periodicSchedule) Next(t time.Time) time.Time {
  2. return t.Truncate(time.Second).Add(ps.period)
  3. }

atScheduleNext()方法先计算当天该时间点,再加上周期就是下次触发的时间:

  1. func (as atSchedule) reset(t time.Time) time.Time {
  2. return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
  3. }
  4. func (as atSchedule) Next(t time.Time) time.Time {
  5. next := as.reset(t)
  6. if t.After(next) {
  7. return next.Add(as.period)
  8. }
  9. return next
  10. }

periodicSchedule提供了At()方法可以转为atSchedule

  1. func (ps periodicSchedule) At(t string) Schedule {
  2. if ps.period < xtime.Day {
  3. panic("period must be at least in days")
  4. }
  5. // parse t naively
  6. h, m, err := parse(t)
  7. if err != nil {
  8. panic(err.Error())
  9. }
  10. return &atSchedule{
  11. period: ps.period,
  12. hh: h,
  13. mm: m,
  14. }
  15. }

自定义时间策略

我们可以很轻松的实现一个自定义的时间策略。例如,我们要实现一个“指数退避”的时间序列,先等待 1s,然后 2s、4s…

  1. type ExponentialBackOffSchedule struct {
  2. last int
  3. }
  4. func (e *ExponentialBackOffSchedule) Next(t time.Time) time.Time {
  5. interval := time.Duration(math.Pow(2.0, float64(e.last))) * time.Second
  6. e.last += 1
  7. return t.Truncate(time.Second).Add(interval)
  8. }
  9. func main() {
  10. var wg sync.WaitGroup
  11. wg.Add(1)
  12. c := gron.New()
  13. c.AddFunc(&ExponentialBackOffSchedule{}, func() {
  14. fmt.Println(time.Now().Local().Format("2006-01-02 15:04:05"), "hello")
  15. })
  16. c.Start()
  17. wg.Wait()
  18. }

运行结果如下:

  1. 2020-04-20 23:47:11 hello
  2. 2020-04-20 23:47:13 hello
  3. 2020-04-20 23:47:17 hello
  4. 2020-04-20 23:47:25 hello

第二次输出与第一次相差 2s,第三次与第二次相差 4s,第4次与第三次相差 8s,完美!

总结

本文介绍了gron这个小巧的定时任务库,如何使用,如何自定义任务和时间策略,顺带分析了一下源码。gron源码实现非常简洁,非常推荐阅读!

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. gron GitHub:https://github.com/roylee0704/gron
  2. Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib