使用 timer 定时
    执行命令怎么做到计时执行这个事情呢?Golang 有个标准库 time,里面提供一个计时器 timer 的结构,是否可以使用这个 timer 来执行呢?我们先来看看 timer 是怎么使用的:

    1. func main() {
    2. timer := time.NewTimer(3 * time.Second) // 定一个计时器,3s后触发
    3. select {
    4. now <-timer.C: // 监听计时器中的事件
    5. fmt.Println("3秒执行任务, 现在时间", now) //3s后执行
    6. }
    7. }

    如果有一堆定时任务,要怎么定时执行呢?

    1. // Entity代表每个定时任务
    2. entries := []Entry
    3. // 计算每个定时任务时间
    4. for _, entry := range entries {
    5. entry.Next = next(entry)
    6. }
    7. for {
    8. // 根据Next时间排序
    9. sortByTime(entries)
    10. // 创建计时器
    11. timer = time.NewTimer(entries.Early.Sub(time.Now()))
    12. select {
    13. case now = <-timer.C:
    14. for _, entry := entries {
    15. // 对已经到了时间的任务,执行
    16. if entry.Next.Ok() {
    17. go startJob(entry)
    18. }
    19. // 所有任务重新计算下个timer
    20. entry.Next = next(entry)
    21. }
    22. }
    23. }

    使用 cron 包定时执行命令

    1. // 创建一个cron实例
    2. c := cron.New()
    3. // 每整点30分钟执行一次
    4. c.AddFunc("30 * * * *", func() {
    5. fmt.Println("Every hour on the half hour")
    6. })
    7. // 上午3-6点,下午8-11点的30分钟执行
    8. c.AddFunc("30 3-6,20-23 * * *", func() {
    9. fmt.Println(".. in the range 3-6am, 8-11pm")
    10. })
    11. // 东京时间4:30执行一次
    12. c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() {
    13. fmt.Println("Runs at 04:30 Tokyo time every day")
    14. })
    15. // 从现在开始每小时执行一次
    16. c.AddFunc("@hourly", func() {
    17. fmt.Println("Every hour, starting an hour from now")
    18. })
    19. // 从现在开始,每一个半小时执行一次
    20. c.AddFunc("@every 1h30m", func() {
    21. fmt.Println("Every hour thirty, starting an hour thirty from now")
    22. })
    23. // 启动cron
    24. c.Start()
    25. ...
    26. // 在cron运行过程中增加任务
    27. c.AddFunc("@daily", func() { fmt.Println("Every day") })
    28. ..
    29. // 查看运行中的任务
    30. inspect(c.Entries())
    31. ..
    32. // 停止cron的运行,优雅停止,所有正在运行中的任务不会停止。
    33. c.Stop()

    SpecSchedule包秒级别的定时

    1. // 创建一个cron实例
    2. c := cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
    3. // 每秒执行一次
    4. c.AddFunc("* * * * * *", func() {
    5. fmt.Println("Every hour on the half hour")
    6. })

    在 cron 库中,增加一个定时任务的 AddFunc 方法,有两个参数:时间描述语言、匿名函数。那么明显,AddCronCommand 函数中核心要做的,就是将 Command 结构的执行封装成一个匿名函数,再调用 cron 的 AddFunc 方法就可以了。

    因为在 Golang 中,fork 可以启动一个子进程,但是这个子进程是无法继承父进程的调度模型的。Golang 的调度模型是在用户态的 runtime 中自己进行调度的,而系统调用 fork 出来的子进程默认只会有单线程。所以在 Golang 中尽量不要使用 fork 的方式来复制启动当前进程。

    另一个办法是使用 os.StartProcess 来启动一个进程,执行当前进程相同的二进制文件以及当前进程相同的参数。

    https://github.com/sevlyar/go-daemon
    https://github.com/sevlyar/retag

    如何实现分布式定时器

    先定义接口,这个接口的功能是一个分布式的选择器,当有很多节点要执行某个服务的时候,只选择出其中一个节点。这样不管底层是否用 Redis 实现分布式选择器,在业务层我们都可以不用关心。

    1. package contract
    2. import "time"
    3. // DistributedKey 定义字符串凭证
    4. const DistributedKey = "hade:distributed"
    5. // Distributed 分布式服务
    6. type Distributed interface {
    7. // Select 分布式选择器, 所有节点对某个服务进行抢占,只选择其中一个节点
    8. // ServiceName 服务名字
    9. // appID 当前的AppID
    10. // holdTime 分布式选择器hold住的时间
    11. // 返回值
    12. // selectAppID 分布式选择器最终选择的App
    13. // err 异常才返回,如果没有被选择,不返回err
    14. Select(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error)
    15. }

    这个具体实现就有很多方式了,Redis 只是其中一种而已,而且 Redis 要到后面章节才能引入。这里就实现一个本地文件锁。当一个服务器上有多个进程需要进行抢锁操作,文件锁是一种单机多进程抢占的很简易的实现方式。在 Golang 中,其使用方法也比较简单。

    多个进程同时使用 os.OpenFile 打开一个文件,并使用 syscall.Flock 带上 syscall.LOCK_EX 参数来对这个文件加文件锁,这里只会有一个进程抢占到文件锁,而其他抢占不到的进程从 syscall.Flock 函数中获取到的就是 error。根据这个 error 是否为空,我们就能判断是否抢占到了文件锁。

    释放文件锁有两种方式,一种方式是调用 syscall.Flock 带上 syscall.LOCK_UN 的参数,另外一种方式是抢占到锁的进程结束,也会自动释放文件锁。

    本地文件锁的具体实现

    1. // Select 为分布式选择器
    2. func (s LocalDistributedService) Select(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error) {
    3. appService := s.container.MustMake(contract.AppKey).(contract.App)
    4. runtimeFolder := appService.RuntimeFolder()
    5. lockFile := filepath.Join(runtimeFolder, "disribute_"+serviceName)
    6. // 打开文件锁
    7. lock, err := os.OpenFile(lockFile, os.O_RDWR|os.O_CREATE, 0666)
    8. if err != nil {
    9. return "", err
    10. }
    11. // 尝试独占文件锁
    12. err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
    13. // 抢不到文件锁
    14. if err != nil {
    15. // 读取被选择的appid
    16. selectAppIDByt, err := ioutil.ReadAll(lock)
    17. if err != nil {
    18. return "", err
    19. }
    20. return string(selectAppIDByt), err
    21. }
    22. // 在一段时间内,选举有效,其他节点在这段时间不能再进行抢占
    23. go func() {
    24. defer func() {
    25. // 释放文件锁
    26. syscall.Flock(int(lock.Fd()), syscall.LOCK_UN)
    27. // 释放文件
    28. lock.Close()
    29. // 删除文件锁对应的文件
    30. os.Remove(lockFile)
    31. }()
    32. // 创建选举结果有效的计时器
    33. timer := time.NewTimer(holdTime)
    34. // 等待计时器结束
    35. <-timer.C
    36. }()
    37. // 这里已经是抢占到了,将抢占到的appID写入文件
    38. if _, err := lock.WriteString(appID); err != nil {
    39. return "", err
    40. }
    41. return appID, nil
    42. }

    分布式选举

    1. // AddDistributedCronCommand 实现一个分布式定时器
    2. // serviceName 这个服务的唯一名字,不允许带有空格
    3. // spec 具体的执行时间
    4. // cmd 具体的执行命令
    5. // holdTime 表示如果我选择上了,这次选择持续的时间,也就是锁释放的时间
    6. func (c *Command) AddDistributedCronCommand(serviceName string, spec string, cmd *Command, holdTime time.Duration) {
    7. root := c.Root()
    8. // 初始化cron
    9. if root.Cron == nil {
    10. root.Cron = cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
    11. root.CronSpecs = []CronSpec{}
    12. }
    13. // cron命令的注释,这里注意Type为distributed-cron,ServiceName需要填写
    14. root.CronSpecs = append(root.CronSpecs, CronSpec{
    15. Type: "distributed-cron",
    16. Cmd: cmd,
    17. Spec: spec,
    18. ServiceName: serviceName,
    19. })
    20. appService := root.GetContainer().MustMake(contract.AppKey).(contract.App)
    21. distributeServce := root.GetContainer().MustMake(contract.DistributedKey).(contract.Distributed)
    22. appID := appService.AppID()
    23. // 复制要执行的command为cronCmd,并且设置为rootCmd
    24. var cronCmd Command
    25. ctx := root.Context()
    26. cronCmd = *cmd
    27. cronCmd.args = []string{}
    28. cronCmd.SetParentNull()
    29. // cron增加匿名函数
    30. root.Cron.AddFunc(spec, func() {
    31. // 防止panic
    32. defer func() {
    33. if err := recover(); err != nil {
    34. log.Println(err)
    35. }
    36. }()
    37. // 节点进行选举,返回选举结果
    38. selectedAppID, err := distributeServce.Select(serviceName, appID, holdTime)
    39. if err != nil {
    40. return
    41. }
    42. // 如果自己没有被选择到,直接返回
    43. if selectedAppID != appID {
    44. return
    45. }
    46. // 如果自己被选择到了,执行这个定时任务
    47. err = cronCmd.ExecuteContext(ctx)
    48. if err != nil {
    49. log.Println(err)
    50. }
    51. })
    52. }

    问答:
    一个节点是一台服务器还是运行一次程序就是一个节点,多个节点怎么?
    多个节点即可以是一个服务器一个,也可以是多个进程。不过一般现在容器化这么普遍的情况,基本上一个节点在实际上上一个pod的概念

    问答:
    linux cron 命令会把任务写入 crontab 文件,而这里的代码似乎没有存储cron任务,所以cron任务其实是写死在app里并从命令行启动的?
    这样似乎会带来一些额外的开发和运维工作,比如需要新增和修改cron任务时就需要升级app并重新部署。 在分布式调度cron任务时还会有个问题,在同一个环境中启动多个app实例去抢占执行cron任务的话就只会记下最后一个pid,那么那些管理cron任务的子命令就都失效了。所以分布式调度必然意味着需要分布式部署?

    第一个问题,是的,cron是写在app里面运行的,每次升级cron需要升级app。第二个问题是存在这样问题,这个之前没考虑到,同一个目录启动多个实例。标准办法应该是每个实例启动一个runtimefolder,把runtimefolder从环境变量中传递进去才行