使用 timer 定时
执行命令怎么做到计时执行这个事情呢?Golang 有个标准库 time,里面提供一个计时器 timer 的结构,是否可以使用这个 timer 来执行呢?我们先来看看 timer 是怎么使用的:
func main() {
timer := time.NewTimer(3 * time.Second) // 定一个计时器,3s后触发
select {
now <-timer.C: // 监听计时器中的事件
fmt.Println("3秒执行任务, 现在时间", now) //3s后执行
}
}
如果有一堆定时任务,要怎么定时执行呢?
// Entity代表每个定时任务
entries := []Entry
// 计算每个定时任务时间
for _, entry := range entries {
entry.Next = next(entry)
}
for {
// 根据Next时间排序
sortByTime(entries)
// 创建计时器
timer = time.NewTimer(entries.Early.Sub(time.Now()))
select {
case now = <-timer.C:
for _, entry := entries {
// 对已经到了时间的任务,执行
if entry.Next.Ok() {
go startJob(entry)
}
// 所有任务重新计算下个timer
entry.Next = next(entry)
}
}
}
使用 cron 包定时执行命令
// 创建一个cron实例
c := cron.New()
// 每整点30分钟执行一次
c.AddFunc("30 * * * *", func() {
fmt.Println("Every hour on the half hour")
})
// 上午3-6点,下午8-11点的30分钟执行
c.AddFunc("30 3-6,20-23 * * *", func() {
fmt.Println(".. in the range 3-6am, 8-11pm")
})
// 东京时间4:30执行一次
c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() {
fmt.Println("Runs at 04:30 Tokyo time every day")
})
// 从现在开始每小时执行一次
c.AddFunc("@hourly", func() {
fmt.Println("Every hour, starting an hour from now")
})
// 从现在开始,每一个半小时执行一次
c.AddFunc("@every 1h30m", func() {
fmt.Println("Every hour thirty, starting an hour thirty from now")
})
// 启动cron
c.Start()
...
// 在cron运行过程中增加任务
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// 查看运行中的任务
inspect(c.Entries())
..
// 停止cron的运行,优雅停止,所有正在运行中的任务不会停止。
c.Stop()
SpecSchedule包秒级别的定时
// 创建一个cron实例
c := cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
// 每秒执行一次
c.AddFunc("* * * * * *", func() {
fmt.Println("Every hour on the half hour")
})
在 cron 库中,增加一个定时任务的 AddFunc 方法,有两个参数:时间描述语言、匿名函数。那么明显,AddCronCommand 函数中核心要做的,就是将 Command 结构的执行封装成一个匿名函数,再调用 cron 的 AddFunc 方法就可以了。
因为在 Golang 中,fork 可以启动一个子进程,但是这个子进程是无法继承父进程的调度模型的。Golang 的调度模型是在用户态的 runtime 中自己进行调度的,而系统调用 fork 出来的子进程默认只会有单线程。所以在 Golang 中尽量不要使用 fork 的方式来复制启动当前进程。
另一个办法是使用 os.StartProcess 来启动一个进程,执行当前进程相同的二进制文件以及当前进程相同的参数。
https://github.com/sevlyar/go-daemon
https://github.com/sevlyar/retag
如何实现分布式定时器
先定义接口,这个接口的功能是一个分布式的选择器,当有很多节点要执行某个服务的时候,只选择出其中一个节点。这样不管底层是否用 Redis 实现分布式选择器,在业务层我们都可以不用关心。
package contract
import "time"
// DistributedKey 定义字符串凭证
const DistributedKey = "hade:distributed"
// Distributed 分布式服务
type Distributed interface {
// Select 分布式选择器, 所有节点对某个服务进行抢占,只选择其中一个节点
// ServiceName 服务名字
// appID 当前的AppID
// holdTime 分布式选择器hold住的时间
// 返回值
// selectAppID 分布式选择器最终选择的App
// err 异常才返回,如果没有被选择,不返回err
Select(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error)
}
这个具体实现就有很多方式了,Redis 只是其中一种而已,而且 Redis 要到后面章节才能引入。这里就实现一个本地文件锁。当一个服务器上有多个进程需要进行抢锁操作,文件锁是一种单机多进程抢占的很简易的实现方式。在 Golang 中,其使用方法也比较简单。
多个进程同时使用 os.OpenFile 打开一个文件,并使用 syscall.Flock 带上 syscall.LOCK_EX 参数来对这个文件加文件锁,这里只会有一个进程抢占到文件锁,而其他抢占不到的进程从 syscall.Flock 函数中获取到的就是 error。根据这个 error 是否为空,我们就能判断是否抢占到了文件锁。
释放文件锁有两种方式,一种方式是调用 syscall.Flock 带上 syscall.LOCK_UN 的参数,另外一种方式是抢占到锁的进程结束,也会自动释放文件锁。
本地文件锁的具体实现
// Select 为分布式选择器
func (s LocalDistributedService) Select(serviceName string, appID string, holdTime time.Duration) (selectAppID string, err error) {
appService := s.container.MustMake(contract.AppKey).(contract.App)
runtimeFolder := appService.RuntimeFolder()
lockFile := filepath.Join(runtimeFolder, "disribute_"+serviceName)
// 打开文件锁
lock, err := os.OpenFile(lockFile, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return "", err
}
// 尝试独占文件锁
err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
// 抢不到文件锁
if err != nil {
// 读取被选择的appid
selectAppIDByt, err := ioutil.ReadAll(lock)
if err != nil {
return "", err
}
return string(selectAppIDByt), err
}
// 在一段时间内,选举有效,其他节点在这段时间不能再进行抢占
go func() {
defer func() {
// 释放文件锁
syscall.Flock(int(lock.Fd()), syscall.LOCK_UN)
// 释放文件
lock.Close()
// 删除文件锁对应的文件
os.Remove(lockFile)
}()
// 创建选举结果有效的计时器
timer := time.NewTimer(holdTime)
// 等待计时器结束
<-timer.C
}()
// 这里已经是抢占到了,将抢占到的appID写入文件
if _, err := lock.WriteString(appID); err != nil {
return "", err
}
return appID, nil
}
分布式选举
// AddDistributedCronCommand 实现一个分布式定时器
// serviceName 这个服务的唯一名字,不允许带有空格
// spec 具体的执行时间
// cmd 具体的执行命令
// holdTime 表示如果我选择上了,这次选择持续的时间,也就是锁释放的时间
func (c *Command) AddDistributedCronCommand(serviceName string, spec string, cmd *Command, holdTime time.Duration) {
root := c.Root()
// 初始化cron
if root.Cron == nil {
root.Cron = cron.New(cron.WithParser(cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)))
root.CronSpecs = []CronSpec{}
}
// cron命令的注释,这里注意Type为distributed-cron,ServiceName需要填写
root.CronSpecs = append(root.CronSpecs, CronSpec{
Type: "distributed-cron",
Cmd: cmd,
Spec: spec,
ServiceName: serviceName,
})
appService := root.GetContainer().MustMake(contract.AppKey).(contract.App)
distributeServce := root.GetContainer().MustMake(contract.DistributedKey).(contract.Distributed)
appID := appService.AppID()
// 复制要执行的command为cronCmd,并且设置为rootCmd
var cronCmd Command
ctx := root.Context()
cronCmd = *cmd
cronCmd.args = []string{}
cronCmd.SetParentNull()
// cron增加匿名函数
root.Cron.AddFunc(spec, func() {
// 防止panic
defer func() {
if err := recover(); err != nil {
log.Println(err)
}
}()
// 节点进行选举,返回选举结果
selectedAppID, err := distributeServce.Select(serviceName, appID, holdTime)
if err != nil {
return
}
// 如果自己没有被选择到,直接返回
if selectedAppID != appID {
return
}
// 如果自己被选择到了,执行这个定时任务
err = cronCmd.ExecuteContext(ctx)
if err != nil {
log.Println(err)
}
})
}
问答:
一个节点是一台服务器还是运行一次程序就是一个节点,多个节点怎么?
多个节点即可以是一个服务器一个,也可以是多个进程。不过一般现在容器化这么普遍的情况,基本上一个节点在实际上上一个pod的概念
问答:
linux cron 命令会把任务写入 crontab 文件,而这里的代码似乎没有存储cron任务,所以cron任务其实是写死在app里并从命令行启动的?
这样似乎会带来一些额外的开发和运维工作,比如需要新增和修改cron任务时就需要升级app并重新部署。 在分布式调度cron任务时还会有个问题,在同一个环境中启动多个app实例去抢占执行cron任务的话就只会记下最后一个pid,那么那些管理cron任务的子命令就都失效了。所以分布式调度必然意味着需要分布式部署?
第一个问题,是的,cron是写在app里面运行的,每次升级cron需要升级app。第二个问题是存在这样问题,这个之前没考虑到,同一个目录启动多个实例。标准办法应该是每个实例启动一个runtimefolder,把runtimefolder从环境变量中传递进去才行