1. func InitScheduler() error {
    2. Sched = &Scheduler{
    3. cron: cron.New(cron.WithSeconds()),
    4. }
    5. if err := Sched.Start(); err != nil {
    6. log.Errorf("start scheduler error: %s", err.Error())
    7. return err
    8. }
    9. return nil
    10. }
    1. type Scheduler struct {
    2. cron *cron.Cron
    3. }
    1. package cron
    2. import (
    3. "context"
    4. "sort"
    5. "sync"
    6. "time"
    7. )
    8. // Cron keeps track of any number of entries, invoking the associated func as
    9. // specified by the schedule. It may be started, stopped, and the entries may
    10. // be inspected while running.
    11. type Cron struct {
    12. entries []*Entry
    13. chain Chain
    14. stop chan struct{}
    15. add chan *Entry
    16. remove chan EntryID
    17. snapshot chan chan []Entry
    18. running bool
    19. logger Logger
    20. runningMu sync.Mutex
    21. location *time.Location
    22. parser Parser
    23. nextID EntryID
    24. jobWaiter sync.WaitGroup
    25. }
    26. // Job is an interface for submitted cron jobs.
    27. type Job interface {
    28. Run()
    29. }
    30. // Schedule describes a job's duty cycle.
    31. type Schedule interface {
    32. // Next returns the next activation time, later than the given time.
    33. // Next is invoked initially, and then each time the job is run.
    34. Next(time.Time) time.Time
    35. }
    36. // EntryID identifies an entry within a Cron instance
    37. type EntryID int
    38. // Entry consists of a schedule and the func to execute on that schedule.
    39. type Entry struct {
    40. // ID is the cron-assigned ID of this entry, which may be used to look up a
    41. // snapshot or remove it.
    42. ID EntryID
    43. // Schedule on which this job should be run.
    44. Schedule Schedule
    45. // Next time the job will run, or the zero time if Cron has not been
    46. // started or this entry's schedule is unsatisfiable
    47. Next time.Time
    48. // Prev is the last time this job was run, or the zero time if never.
    49. Prev time.Time
    50. // WrappedJob is the thing to run when the Schedule is activated.
    51. WrappedJob Job
    52. // Job is the thing that was submitted to cron.
    53. // It is kept around so that user code that needs to get at the job later,
    54. // e.g. via Entries() can do so.
    55. Job Job
    56. }
    57. // Valid returns true if this is not the zero entry.
    58. func (e Entry) Valid() bool { return e.ID != 0 }
    59. // byTime is a wrapper for sorting the entry array by time
    60. // (with zero time at the end).
    61. type byTime []*Entry
    62. func (s byTime) Len() int { return len(s) }
    63. func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
    64. func (s byTime) Less(i, j int) bool {
    65. // Two zero times should return false.
    66. // Otherwise, zero is "greater" than any other time.
    67. // (To sort it at the end of the list.)
    68. if s[i].Next.IsZero() {
    69. return false
    70. }
    71. if s[j].Next.IsZero() {
    72. return true
    73. }
    74. return s[i].Next.Before(s[j].Next)
    75. }
    76. // New returns a new Cron job runner, modified by the given options.
    77. //
    78. // Available Settings
    79. //
    80. // Time Zone
    81. // Description: The time zone in which schedules are interpreted
    82. // Default: time.Local
    83. //
    84. // Parser
    85. // Description: Parser converts cron spec strings into cron.Schedules.
    86. // Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
    87. //
    88. // Chain
    89. // Description: Wrap submitted jobs to customize behavior.
    90. // Default: A chain that recovers panics and logs them to stderr.
    91. //
    92. // See "cron.With*" to modify the default behavior.
    93. func New(opts ...Option) *Cron {
    94. c := &Cron{
    95. entries: nil,
    96. chain: NewChain(),
    97. add: make(chan *Entry),
    98. stop: make(chan struct{}),
    99. snapshot: make(chan chan []Entry),
    100. remove: make(chan EntryID),
    101. running: false,
    102. runningMu: sync.Mutex{},
    103. logger: DefaultLogger,
    104. location: time.Local,
    105. parser: standardParser,
    106. }
    107. for _, opt := range opts {
    108. opt(c)
    109. }
    110. return c
    111. }
    112. // FuncJob is a wrapper that turns a func() into a cron.Job
    113. type FuncJob func()
    114. func (f FuncJob) Run() { f() }
    115. // AddFunc adds a func to the Cron to be run on the given schedule.
    116. // The spec is parsed using the time zone of this Cron instance as the default.
    117. // An opaque ID is returned that can be used to later remove it.
    118. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
    119. return c.AddJob(spec, FuncJob(cmd))
    120. }
    121. // AddJob adds a Job to the Cron to be run on the given schedule.
    122. // The spec is parsed using the time zone of this Cron instance as the default.
    123. // An opaque ID is returned that can be used to later remove it.
    124. func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
    125. schedule, err := c.parser.Parse(spec)
    126. if err != nil {
    127. return 0, err
    128. }
    129. return c.Schedule(schedule, cmd), nil
    130. }
    131. // Schedule adds a Job to the Cron to be run on the given schedule.
    132. // The job is wrapped with the configured Chain.
    133. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
    134. c.runningMu.Lock()
    135. defer c.runningMu.Unlock()
    136. c.nextID++
    137. entry := &Entry{
    138. ID: c.nextID,
    139. Schedule: schedule,
    140. WrappedJob: c.chain.Then(cmd),
    141. Job: cmd,
    142. }
    143. if !c.running {
    144. c.entries = append(c.entries, entry)
    145. } else {
    146. c.add <- entry
    147. }
    148. return entry.ID
    149. }
    150. // Entries returns a snapshot of the cron entries.
    151. func (c *Cron) Entries() []Entry {
    152. c.runningMu.Lock()
    153. defer c.runningMu.Unlock()
    154. if c.running {
    155. replyChan := make(chan []Entry, 1)
    156. c.snapshot <- replyChan
    157. return <-replyChan
    158. }
    159. return c.entrySnapshot()
    160. }
    161. // Location gets the time zone location
    162. func (c *Cron) Location() *time.Location {
    163. return c.location
    164. }
    165. // Entry returns a snapshot of the given entry, or nil if it couldn't be found.
    166. func (c *Cron) Entry(id EntryID) Entry {
    167. for _, entry := range c.Entries() {
    168. if id == entry.ID {
    169. return entry
    170. }
    171. }
    172. return Entry{}
    173. }
    174. // Remove an entry from being run in the future.
    175. func (c *Cron) Remove(id EntryID) {
    176. c.runningMu.Lock()
    177. defer c.runningMu.Unlock()
    178. if c.running {
    179. c.remove <- id
    180. } else {
    181. c.removeEntry(id)
    182. }
    183. }
    184. // Start the cron scheduler in its own goroutine, or no-op if already started.
    185. func (c *Cron) Start() {
    186. c.runningMu.Lock()
    187. defer c.runningMu.Unlock()
    188. if c.running {
    189. return
    190. }
    191. c.running = true
    192. go c.run()
    193. }
    194. // Run the cron scheduler, or no-op if already running.
    195. func (c *Cron) Run() {
    196. c.runningMu.Lock()
    197. if c.running {
    198. c.runningMu.Unlock()
    199. return
    200. }
    201. c.running = true
    202. c.runningMu.Unlock()
    203. c.run()
    204. }
    205. // run the scheduler.. this is private just due to the need to synchronize
    206. // access to the 'running' state variable.
    207. func (c *Cron) run() {
    208. c.logger.Info("start")
    209. // Figure out the next activation times for each entry.
    210. now := c.now()
    211. for _, entry := range c.entries {
    212. entry.Next = entry.Schedule.Next(now)
    213. c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    214. }
    215. for {
    216. // Determine the next entry to run.
    217. sort.Sort(byTime(c.entries))
    218. var timer *time.Timer
    219. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
    220. // If there are no entries yet, just sleep - it still handles new entries
    221. // and stop requests.
    222. timer = time.NewTimer(100000 * time.Hour)
    223. } else {
    224. timer = time.NewTimer(c.entries[0].Next.Sub(now))
    225. }
    226. for {
    227. select {
    228. case now = <-timer.C:
    229. now = now.In(c.location)
    230. c.logger.Info("wake", "now", now)
    231. // Run every entry whose next time was less than now
    232. for _, e := range c.entries {
    233. if e.Next.After(now) || e.Next.IsZero() {
    234. break
    235. }
    236. c.startJob(e.WrappedJob)
    237. e.Prev = e.Next
    238. e.Next = e.Schedule.Next(now)
    239. c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
    240. }
    241. case newEntry := <-c.add:
    242. timer.Stop()
    243. now = c.now()
    244. newEntry.Next = newEntry.Schedule.Next(now)
    245. c.entries = append(c.entries, newEntry)
    246. c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
    247. case replyChan := <-c.snapshot:
    248. replyChan <- c.entrySnapshot()
    249. continue
    250. case <-c.stop:
    251. timer.Stop()
    252. c.logger.Info("stop")
    253. return
    254. case id := <-c.remove:
    255. timer.Stop()
    256. now = c.now()
    257. c.removeEntry(id)
    258. c.logger.Info("removed", "entry", id)
    259. }
    260. break
    261. }
    262. }
    263. }
    264. // startJob runs the given job in a new goroutine.
    265. func (c *Cron) startJob(j Job) {
    266. c.jobWaiter.Add(1)
    267. go func() {
    268. defer c.jobWaiter.Done()
    269. j.Run()
    270. }()
    271. }
    272. // now returns current time in c location
    273. func (c *Cron) now() time.Time {
    274. return time.Now().In(c.location)
    275. }
    276. // Stop stops the cron scheduler if it is running; otherwise it does nothing.
    277. // A context is returned so the caller can wait for running jobs to complete.
    278. func (c *Cron) Stop() context.Context {
    279. c.runningMu.Lock()
    280. defer c.runningMu.Unlock()
    281. if c.running {
    282. c.stop <- struct{}{}
    283. c.running = false
    284. }
    285. ctx, cancel := context.WithCancel(context.Background())
    286. go func() {
    287. c.jobWaiter.Wait()
    288. cancel()
    289. }()
    290. return ctx
    291. }
    292. // entrySnapshot returns a copy of the current cron entry list.
    293. func (c *Cron) entrySnapshot() []Entry {
    294. var entries = make([]Entry, len(c.entries))
    295. for i, e := range c.entries {
    296. entries[i] = *e
    297. }
    298. return entries
    299. }
    300. func (c *Cron) removeEntry(id EntryID) {
    301. var entries []*Entry
    302. for _, e := range c.entries {
    303. if e.ID != id {
    304. entries = append(entries, e)
    305. }
    306. }
    307. c.entries = entries
    308. }