简介

调度器-执行器的职责是获取调度对象,发给调度器去按照调度模式进行调度,最终给执行器执行相应逻辑。
整体由四个部分组成:

  • Poller:定时从数据源获取调度对象列表,通过遍历把调度对象提交给Scheduler进行调度。
  • Subscriber:监听指定主题的消息队列,把拉取到的id提交给Scheduler进行调度。
  • Scheduler:处理接收的id,最终获取调度对象并提交给Worker。
  • Worker:根据具体的调度对象来执行相应逻辑。


image.png

有三种调度模式:

  • Cron:调度器根据cron表达式决定执行任务的时机。
  • Period:调度器根据时间周期(比如每秒、每10分钟…)决定执行任务的时间。
  • Plan:调度器根据指定的具体时间决定执行任务的时机,只会执行一次

    模块说明

    Poll模块

    Poll模块分为:

  • CronPoller:指定调度时间(默认每5分钟)调用FindCronScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。

  • PeriodPoller:指定调度时间(默认每5分钟)调用FindPeriodScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。
  • PlanPoller:指定调度时间(默认每5分钟)调用FindPlanScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。


image.png

Subscribe模块

Subcribe作为消息队列的消费者,会根据传入的订阅主题来不断从消息队列拉取相应的调度对象的id,然后把id提交给Scheduler进行调度。

image.png

Scheduler模块

Scheduler作为调度器负责分派调度对象给不同的调度器处理,由下面组成:

  • CronScheduler:处理调度模式为cron类型的调度对象,并最终提交到Worker进行处理。
  • PeriodScheduler:处理调度模式为period类型的调度对象,并最终提交到Worker进行处理。
  • PlanScheduler:处理调度模式为plan类型的调度对象,并最终提交到Worker进行处理。

image.png

CronScheduler、PeriodScheduler

2个调度器只是创建的runner类型不同,其代码逻辑基本一致。
维护一个重要的成员 runnerMap。顾名思义,它是一个map对象,key值为调度对象的id,value为CronRunner对象,用于存储每个调度对象对应的定时任务对象。
运作过程如下图:(比较简陋,但应该可以知道干了什么)

image.png

PlanScheduler

调度器维护三个重要成员:

  • pendingId:监听到的id。
  • ids:chan类型,会不断监听该成员,把获取到的id赋值给pendingId。
  • scheduleObjectHeap:根据通知时间排序的最小堆。

因为plan类型的在不修改的情况下只会发一次,所以调用UpdatePlanHandledStatusById函数来更新类似状态的字段标识它是否已经执行过了。
运作过程如下图:(比较简陋,但应该可以知道干了什么)

image.png

Worker模块

执行器会根据接收的调度对象,来执行Work函数来实现相应的逻辑。Work函数接收的是BaseScheduleObject对象,最终执行的是它的Do函数。

  1. func (worker *Worker) Work(scheduleObject scheduleriface.BaseScheduleObject) error {
  2. err := scheduleObject.Do(nil)
  3. if err != nil {
  4. worker.logger.Warn("WorkFailed",
  5. zap.Error(err),
  6. )
  7. return err
  8. }
  9. return nil
  10. }

使用说明

调度对象必需实现BeseScheduleObject、BeseScheduleObjectManager接口的方法,其他的话根据自己所需的调度模式来实现对应的接口。

定义的抽象接口

调度对象抽象接口

定义BaseScheduleObject接口,获取调度对象的基础属性,基于此又定义三种调度模式的相应对象接口

  • CronScheduleObject:cron类型的调度对象,处理调度时间
  • PeriodScheduleObject:period类型的调度对象,处理调度时间
  • PlanScheduleObject:plan类型的调度对象,处理调度时间
  • PlanScheduleObjectHeapByTime:plan类型的调度需要的堆对象 ```go type BaseScheduleObject interface { GetId() string GetName() string GetScheduleMode() string GetVersion() int IsDeleted() bool Do(context.Context) error

    IsCronMode() (CronScheduleObject, bool) IsPeriodMode() (PeriodScheduleObject, bool) IsPlanMode() (PlanScheduleObject, bool) } type BaseScheduleObjects []BaseScheduleObject

type CronScheduleObject interface { BaseScheduleObject GetCronSpec() string }

type CronScheduleObjects []CronScheduleObject

func (objects CronScheduleObjects) Append(others …CronScheduleObject) CronScheduleObjects { return append(objects, others…) }

type PeriodScheduleObject interface { BaseScheduleObject GetSchedulePeriod() time.Duration }

type PeriodScheduleObjects []PeriodScheduleObject

func (objects PeriodScheduleObjects) Append(others …PeriodScheduleObject) PeriodScheduleObjects { return append(objects, others…) }

type PlanScheduleObject interface { BaseScheduleObject GetPlannedScheduleTime() time.Time }

type PlanScheduleObjects []PlanScheduleObject

func (objects PlanScheduleObjects) Append(others …PlanScheduleObject) PlanScheduleObjects { return append(objects, others…) }

type PlanScheduleObjectHeapByTime PlanScheduleObjects

func (h PlanScheduleObjectHeapByTime) Native() PlanScheduleObjects { return PlanScheduleObjects(h) }

func (h PlanScheduleObjectHeapByTime) Len() int { return len(h) }

func (h PlanScheduleObjectHeapByTime) Less(i, j int) bool { return h[i].GetPlannedScheduleTime().Before(h[j].GetPlannedScheduleTime()) }

func (h PlanScheduleObjectHeapByTime) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h PlanScheduleObjectHeapByTime) Push(x interface{}) { h = append(*h, x.(PlanScheduleObject)) }

func (h PlanScheduleObjectHeapByTime) Pop() interface{} { old := h n := len(old) x := old[n-1] *h = old[:n-1] return x }

func (h PlanScheduleObjectHeapByTime) Empty() bool { return h.Len() == 0 }

func (h PlanScheduleObjectHeapByTime) Top() PlanScheduleObject { if h.Empty() { return nil }

return h[0]

}

func (h *PlanScheduleObjectHeapByTime) Enqueue(s PlanScheduleObject) { heap.Push(h, s) }

func (h *PlanScheduleObjectHeapByTime) Dequeue() PlanScheduleObject { return heap.Pop(h).(PlanScheduleObject) }

type BaseScheduleObjectManager interface { FindScheduleObjectById(ctx context.Context, id string) (BaseScheduleObject, error) }

type CronScheduleObjectManager interface { BaseScheduleObjectManager

FindCronScheduleObjects(ctx context.Context) (CronScheduleObjects, error)
FindCronScheduleObjectById(ctx context.Context, id string) (CronScheduleObject, error)

}

type PeriodScheduleObjectManager interface { BaseScheduleObjectManager

FindPeriodScheduleObjects(ctx context.Context) (PeriodScheduleObjects, error)
FindPeriodScheduleObjectById(ctx context.Context, id string) (PeriodScheduleObject, error)

}

type PlanScheduleObjectManager interface { BaseScheduleObjectManager

FindPlanScheduleObjects(ctx context.Context) (PlanScheduleObjects, error)
FindPlanScheduleObjectById(ctx context.Context, id string) (PlanScheduleObject, error)
// UpdatePlanScheduleObjectStatusById 根据id更新已处理的plan类型的调度对象的状态, status为true则说明处理成功,反之处理失败
UpdatePlanScheduleObjectStatusById(ctx context.Context, id string, status bool) error

}

<a name="Q9MFe"></a>
#### 调度对象管理接口
管理接口用于对调度对象的数据源进行处理(获取与修改),根据三种调度模式制定四种抽象接口

- BaseScheduleObjectManager:基础管理接口
- CronScheduleObjectManager:cron类型的,用户需实现ScheduleAble接口的方法
- PeriodScheduleObjectManager:period类型的调度对象,处理版本与调度时间
- PlanScheduleObjectManager:plan类型的调度对象,处理调度时间
```go
type BaseScheduleObjectManager interface {
    FindScheduleObjectById(ctx context.Context, id primitive.ObjectID) (BaseScheduleObject, error)
}

type CronScheduleObjectManager interface {
    BaseScheduleObjectManager

    FindCronScheduleObjects(ctx context.Context) (CronScheduleObjects, error)
    FindCronScheduleObjectById(ctx context.Context, id primitive.ObjectID) (CronScheduleObject, error)
}

type PeriodScheduleObjectManager interface {
    BaseScheduleObjectManager

    FindPeriodScheduleObjects(ctx context.Context) (PeriodScheduleObjects, error)
    FindPeriodScheduleObjectById(ctx context.Context, id primitive.ObjectID) (PeriodScheduleObject, error)
}

type PlanScheduleObjectManager interface {
    BaseScheduleObjectManager

    FindPlanScheduleObjects(ctx context.Context) (PlanScheduleObjects, error)
    FindPlanScheduleObjectById(ctx context.Context, id primitive.ObjectID) (PlanScheduleObject, error)
    // UpdatePlanScheduleObjectStatusById 根据id更新已处理的plan类型的调度对象的状态, status为true则说明处理成功,反之处理失败
    UpdatePlanScheduleObjectStatusById(ctx context.Context, id primitive.ObjectID, status bool) error
}