- 从etcd中把job同步到内存
- 实现调度模块, 基于cron表达式调度N个job
- 实现执行模块, 并发执行多个job
- 对job的分布式锁, 防止集群并发
- 把执行的日志保存到mongodb
监听任务的变化
func InitJobMgr() error {
config := clientv3.Config{
Endpoints: G_config.EtcdEndpoints,
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond,
}
client, err := clientv3.New(config)
if err != nil {
return err
}
G_jobMgr = &JobMgr{
client: client,
kv: clientv3.NewKV(client),
lease: clientv3.NewLease(client),
watcher: clientv3.NewWatcher(client),
}
// 启动任务监听
G_jobMgr.watchJobs()
return nil
}
// 监听任务变化
func (jobMgr *JobMgr) watchJobs() error {
// 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision
// 2. 从该revision向后监听变化事件
response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix())
if err != nil {
return err
}
// 当前有哪些任务
for _, kv := range response.Kvs {
job, err := common.UnpackJob(kv.Value)
if err != nil {
return err
}
// job同步到调度协程(scheduler)
log.Println(`当前任务: `, job.Name)
common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
}
// 任务变化后 从该revision向后监听变化事件
go func() {
revision := response.Header.GetRevision() + 1
// 监听/cron/jobs/目录后续变化, 返回chan
watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix())
// 处理监听事件
for watchResponse := range watchChan {
for _, event := range watchResponse.Events {
switch event.Type {
case mvccpb.PUT: // 保存任务事件
// 推送更新事件给 scheduler
if job, err := common.UnpackJob(event.Kv.Value); err != nil {
log.Println(`common.UnpackJob err: `, err)
continue
} else {
log.Println(`获取到保存任务事件: `, job.Name)
// 构建一个更新event事件
common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
}
case mvccpb.DELETE: // 任务被删除了
// 推送删除事件给 scheduler
jobName := common.ExtractJobName(string(event.Kv.Key))
log.Println(`获取到删除任务事件: `, jobName)
// 构建一个删除event事件
common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{
Name: jobName,
})
}
}
}
}()
return nil
}
package worker
import (
"context"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"golang-crontab/common"
"log"
"time"
)
type JobMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
watcher clientv3.Watcher
}
var G_jobMgr *JobMgr
func InitJobMgr() error {
config := clientv3.Config{
Endpoints: G_config.EtcdEndpoints,
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond,
}
client, err := clientv3.New(config)
if err != nil {
return err
}
G_jobMgr = &JobMgr{
client: client,
kv: clientv3.NewKV(client),
lease: clientv3.NewLease(client),
watcher: clientv3.NewWatcher(client),
}
// 启动任务监听
err = G_jobMgr.watchJobs()
// 启动监听killer
G_jobMgr.watchKiller()
return err
}
// 监听强杀任务通知
func (jobMgr *JobMgr) watchKiller() {
var (
watchChan clientv3.WatchChan
watchResp clientv3.WatchResponse
watchEvent *clientv3.Event
jobEvent *common.JobEvent
jobName string
job *common.Job
)
// 监听/cron/killer目录
go func() { // 监听协程
// 监听/cron/killer/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
switch watchEvent.Type {
case mvccpb.PUT: // 杀死任务事件
jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
job = &common.Job{Name: jobName}
jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job)
// 事件推给scheduler
G_scheduler.PushJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除
}
}
}
}()
}
// 监听任务变化
func (jobMgr *JobMgr) watchJobs() error {
// 1. get /cron/jobs/目录下的所有任务, 并且获取当前集群的revision
// 2. 从该revision向后监听变化事件
response, err := jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix())
if err != nil {
return err
}
// 当前有哪些任务
for _, kv := range response.Kvs {
job, err := common.UnpackJob(kv.Value)
if err != nil {
return err
}
// job同步到调度协程(scheduler)
log.Println(`当前存在的任务: `, job.Name)
jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
G_scheduler.PushJobEvent(jobEvent)
}
// 任务变化后 从该revision向后监听变化事件
go func() {
revision := response.Header.GetRevision() + 1
// 监听/cron/jobs/目录后续变化, 返回chan
watchChan := jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(revision), clientv3.WithPrefix())
// 处理监听事件
for watchResponse := range watchChan {
for _, event := range watchResponse.Events {
switch event.Type {
case mvccpb.PUT: // 保存任务事件
// 推送更新事件给 scheduler
if job, err := common.UnpackJob(event.Kv.Value); err != nil {
log.Println(`common.UnpackJob err: `, err)
continue
} else {
log.Println(`获取到保存任务事件: `, job.Name)
// 构建一个更新event事件
jobEvent := common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
G_scheduler.PushJobEvent(jobEvent)
}
case mvccpb.DELETE: // 任务被删除了
// 推送删除事件给 scheduler
jobName := common.ExtractJobName(string(event.Kv.Key))
log.Println(`获取到删除任务事件: `, jobName)
// 构建一个删除event事件
jobEvent := common.BuildJobEvent(common.JOB_EVENT_DELETE, &common.Job{
Name: jobName,
})
G_scheduler.PushJobEvent(jobEvent)
}
}
}
}()
return nil
}
// 创建任务执行锁
func (jobMgr *JobMgr) CreateJobLock(jobName string) (jobLock *JobLock) {
jobLock = InitJobLock(jobName, jobMgr.kv, jobMgr.lease)
return
}
任务发送给scheduler
package worker
import (
"fmt"
"golang-crontab/common"
"log"
"time"
)
// 任务调度
type Scheduler struct {
jobEventChan chan *common.JobEvent
jobPlanTable map[string]*common.JobSchedulePlan
jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表
jobResultChan chan *common.JobExecuteResult // 任务结果队列
}
var G_scheduler *Scheduler
func InitScheduler() {
G_scheduler = &Scheduler{
jobEventChan: make(chan *common.JobEvent, 1000),
jobPlanTable: make(map[string]*common.JobSchedulePlan),
jobExecutingTable: make(map[string]*common.JobExecuteInfo),
jobResultChan: make(chan *common.JobExecuteResult, 1000),
}
go G_scheduler.scheduleLoop()
}
// 重新计算任务调度状态
func (s *Scheduler) TryScheduler() (schedulerAfter time.Duration) {
// 1. 遍历所有任务
// 2. 过期任务立即执行
// 3. 统计最近要过期的任务
// 如果任务表为空话,随便睡眠多久
if len(s.jobPlanTable) == 0 {
schedulerAfter = 1 * time.Second
return
}
now := time.Now()
var nearTime *time.Time
for _, plan := range s.jobPlanTable {
if plan.NextTime.Before(now) || plan.NextTime.Equal(now) {
// 尝试执行任务
//fmt.Println("执行任务: ", time.Now().Format(`2006-01-02 15:04:05`), plan.Job.Name)
s.TryStartJob(plan)
plan.NextTime = plan.Expr.Next(now) // 更新下次执行时间
}
// 统计最近要过期的任务
if nearTime == nil || plan.NextTime.Before(*nearTime) {
nearTime = &plan.NextTime
}
}
if nearTime != nil {
schedulerAfter = (*nearTime).Sub(time.Now())
}
return
}
// 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
// 调度 和 执行 是2件事情
var (
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
)
// 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
// 如果任务正在执行,跳过本次调度
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobPlan.Job.Name]; jobExecuting {
// fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name)
return
}
// 构建执行状态信息
jobExecuteInfo = common.BuildJobExecuteInfo(jobPlan)
// 保存执行状态
scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime)
G_executor.ExecuteJob(jobExecuteInfo)
}
// 调度协程
func (s *Scheduler) scheduleLoop() {
schedulerAfter := s.TryScheduler()
schedulerTimer := time.NewTimer(schedulerAfter)
for {
select {
case jobEvent := <-s.jobEventChan: // 获取到任务变化事件
s.handleJobEvent(jobEvent)
case <-schedulerTimer.C: // 最近的任务到期了
case jobResult := <-s.jobResultChan:
s.handleJobResult(jobResult)
}
// 调度一次任务
schedulerAfter = s.TryScheduler()
// 重置调度间隔
schedulerTimer.Reset(schedulerAfter)
}
}
// 向scheduler推送任务变化事件
func (s *Scheduler) PushJobEvent(jobEvent *common.JobEvent) {
s.jobEventChan <- jobEvent
}
// 处理任务事件
func (s *Scheduler) handleJobEvent(event *common.JobEvent) {
switch event.EventType {
case common.JOB_EVENT_SAVE:
plan, err := common.BuildJobSchedulePlan(event.Job)
if err != nil {
log.Println(`BuildJobSchedulePlanL err: `, err)
}
s.jobPlanTable[event.Job.Name] = plan
case common.JOB_EVENT_DELETE:
if _, ok := s.jobPlanTable[event.Job.Name]; ok {
delete(s.jobPlanTable, event.Job.Name)
}
case common.JOB_EVENT_KILL:
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, exist := s.jobExecutingTable[event.Job.Name]; exist {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 回传任务执行结果
func (s *Scheduler) PushJobResult(jobResult *common.JobExecuteResult) {
s.jobResultChan <- jobResult
}
// 处理任务结果
func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
// 删除执行状态
delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name)
fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err)
var (
jobLog *common.JobLog
)
// 删除执行状态
delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name)
// 生成执行日志
if result.Err != common.ERR_LOCK_ALREADY_REQUIRED {
jobLog = &common.JobLog{
JobName: result.ExecuteInfo.Job.Name,
Command: result.ExecuteInfo.Job.Command,
Output: string(result.Output),
PlanTime: result.ExecuteInfo.PlanTime.UnixNano() / 1000 / 1000,
ScheduleTime: result.ExecuteInfo.RealTime.UnixNano() / 1000 / 1000,
StartTime: result.StartTime.UnixNano() / 1000 / 1000,
EndTime: result.EndTime.UnixNano() / 1000 / 1000,
}
if result.Err != nil {
jobLog.Err = result.Err.Error()
} else {
jobLog.Err = ""
}
G_logSink.Append(jobLog)
}
}
任务执行器
package worker
import (
"golang-crontab/common"
"log"
"math/rand"
"os/exec"
"time"
)
// 任务执行器
type Executor struct {
}
var (
G_executor *Executor
)
// 初始化执行器
func InitExecutor() (err error) {
G_executor = &Executor{}
return
}
// 执行一个任务
func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
//log.Println(`ExecuteJob: `, info.Job.Command)
go func() {
var (
cmd *exec.Cmd
err error
output []byte
result *common.JobExecuteResult
jobLock *JobLock
)
// 任务结果
result = &common.JobExecuteResult{
ExecuteInfo: info,
Output: make([]byte, 0),
}
// 初始化分布式锁
jobLock = G_jobMgr.CreateJobLock(info.Job.Name)
// 记录任务开始时间
result.StartTime = time.Now()
// 上锁
// 随机睡眠(0~1s)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
err = jobLock.TryLock()
defer jobLock.Unlock()
if err != nil { // 上锁失败
log.Println(`上锁失败, err: `, err)
result.Err = err
result.EndTime = time.Now()
} else {
// 上锁成功后,重置任务启动时间
result.StartTime = time.Now()
// 执行shell命令
cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
// 执行并捕获输出
output, err = cmd.CombinedOutput()
// 记录任务结束时间
result.EndTime = time.Now()
result.Output = output
result.Err = err
}
// 任务执行完成后,把执行的结果返回给Scheduler,Scheduler会从executingTable中删除掉执行记录
G_scheduler.PushJobResult(result)
}()
}
分布式锁
package worker
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
"golang-crontab/common"
)
// 分布式锁(TXN事务)
type JobLock struct {
// etcd客户端
kv clientv3.KV
lease clientv3.Lease
jobName string // 任务名
cancelFunc context.CancelFunc // 用于终止自动续租
leaseId clientv3.LeaseID // 租约ID
isLocked bool // 是否上锁成功
}
// 初始化一把锁
func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) {
jobLock = &JobLock{
kv: kv,
lease: lease,
jobName: jobName,
}
return
}
// 尝试上锁
func (jobLock *JobLock) TryLock() (err error) {
var (
leaseGrantResp *clientv3.LeaseGrantResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
leaseId clientv3.LeaseID
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnResp *clientv3.TxnResponse
)
// 1, 创建租约(5秒)
if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
return
}
// context用于取消自动续租
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 租约ID
leaseId = leaseGrantResp.ID
// 2, 自动续租
if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil {
goto FAIL
}
// 3, 处理续租应答的协程
go func() {
var (
keepResp *clientv3.LeaseKeepAliveResponse
)
for {
select {
case keepResp = <-keepRespChan: // 自动续租的应答
if keepResp == nil {
goto END
}
}
}
END:
}()
// 4, 创建事务txn
txn = jobLock.kv.Txn(context.TODO())
// 锁路径
lockKey = common.JOB_LOCK_DIR + jobLock.jobName
// 5, 事务抢锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
goto FAIL
}
// 6, 成功返回, 失败释放租约
if !txnResp.Succeeded { // 锁被占用
err = common.ERR_LOCK_ALREADY_REQUIRED
goto FAIL
}
// 抢锁成功
jobLock.leaseId = leaseId
jobLock.cancelFunc = cancelFunc
jobLock.isLocked = true
return
FAIL:
cancelFunc() // 取消自动续租
jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约
return
}
// 释放锁
func (jobLock *JobLock) Unlock() {
if jobLock.isLocked {
jobLock.cancelFunc() // 取消我们程序自动续租的协程
jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约
}
}
执行日志保存到mongodb
package worker
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang-crontab/common"
"time"
)
// mongodb存储日志
type LogSink struct {
client *mongo.Client
logCollection *mongo.Collection
logChan chan *common.JobLog
autoCommitChan chan *common.LogBatch
}
var (
// 单例
G_logSink *LogSink
)
// 批量写入日志
func (logSink *LogSink) saveLogs(batch *common.LogBatch) {
logSink.logCollection.InsertMany(context.TODO(), batch.Logs)
}
// 日志存储协程
func (logSink *LogSink) writeLoop() {
var (
log *common.JobLog
logBatch *common.LogBatch // 当前的批次
commitTimer *time.Timer
timeoutBatch *common.LogBatch // 超时批次
)
for {
select {
case log = <-logSink.logChan:
if logBatch == nil {
logBatch = &common.LogBatch{}
// 让这个批次超时自动提交(给1秒的时间)
commitTimer = time.AfterFunc(
time.Duration(G_config.JobLogCommitTimeout)*time.Millisecond,
func(batch *common.LogBatch) func() {
return func() {
logSink.autoCommitChan <- batch
}
}(logBatch),
)
}
// 把新日志追加到批次中
logBatch.Logs = append(logBatch.Logs, log)
// 如果批次满了, 就立即发送
if len(logBatch.Logs) >= G_config.JobLogBatchSize {
// 发送日志
logSink.saveLogs(logBatch)
// 清空logBatch
logBatch = nil
// 取消定时器
commitTimer.Stop()
}
case timeoutBatch = <-logSink.autoCommitChan: // 过期的批次
// 判断过期批次是否仍旧是当前的批次
if timeoutBatch != logBatch {
continue // 跳过已经被提交的批次
}
// 把批次写入到mongo中
logSink.saveLogs(timeoutBatch)
// 清空logBatch
logBatch = nil
}
}
}
func InitLogSink() (err error) {
// 建立mongodb连接
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017"))
if err != nil {
return err
}
// 选择db和collection
G_logSink = &LogSink{
client: client,
logCollection: client.Database("cron").Collection("log"),
logChan: make(chan *common.JobLog, 1000),
autoCommitChan: make(chan *common.LogBatch, 1000),
}
// 启动一个mongodb处理协程
go G_logSink.writeLoop()
return
}
// 发送日志
func (logSink *LogSink) Append(jobLog *common.JobLog) {
select {
case logSink.logChan <- jobLog:
default:
// 队列满了就丢弃
}
}
worker 服务注册
package worker
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
"golang-crontab/common"
"net"
"time"
)
// 注册节点到etcd: /cron/workers/IP地址
type Register struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
localIP string // 本机IP
}
var (
G_register *Register
)
// 获取本机网卡IP
func getLocalIP() (ipv4 string, err error) {
var (
addrs []net.Addr
addr net.Addr
ipNet *net.IPNet // IP地址
isIpNet bool
)
// 获取所有网卡
if addrs, err = net.InterfaceAddrs(); err != nil {
return
}
// 取第一个非lo的网卡IP
for _, addr = range addrs {
// 这个网络地址是IP地址: ipv4, ipv6
if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() {
// 跳过IPV6
if ipNet.IP.To4() != nil {
ipv4 = ipNet.IP.String() // 192.168.1.1
return
}
}
}
err = common.ERR_NO_LOCAL_IP_FOUND
return
}
// 注册到/cron/workers/IP, 并自动续租
func (register *Register) keepOnline() {
var (
regKey string
leaseGrantResp *clientv3.LeaseGrantResponse
err error
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
keepAliveResp *clientv3.LeaseKeepAliveResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
)
for {
// 注册路径
regKey = common.JOB_WORKER_DIR + register.localIP
cancelFunc = nil
// 创建租约
if leaseGrantResp, err = register.lease.Grant(context.TODO(), 10); err != nil {
goto RETRY
}
// 自动续租
if keepAliveChan, err = register.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil {
goto RETRY
}
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 注册到etcd
if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil {
goto RETRY
}
// 处理续租应答
for {
select {
case keepAliveResp = <-keepAliveChan:
if keepAliveResp == nil { // 续租失败
goto RETRY
}
}
}
RETRY:
time.Sleep(1 * time.Second)
if cancelFunc != nil {
cancelFunc()
}
}
}
func InitRegister() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
localIp string
)
// 初始化配置
config = clientv3.Config{
Endpoints: G_config.EtcdEndpoints, // 集群地址
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
return
}
// 本机IP
if localIp, err = getLocalIP(); err != nil {
return
}
// 得到KV和Lease的API子集
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
G_register = &Register{
client: client,
kv: kv,
lease: lease,
localIP: localIp,
}
// 服务注册
go G_register.keepOnline()
return
}