2017年的代码 https://github.com/AlexStocks/goext/blob/master/time/timer.go
    2021年的代码 https://github.com/dubbogo/gost/blob/master/time/timer.go

    1. // Copyright 2016 ~ 2018 AlexStocks(https://github.com/AlexStocks).
    2. // All rights reserved. Use of this source code is
    3. // governed by Apache License 2.0.
    4. // Package gxtime encapsulates some golang.time functions
    5. package gxtime
    6. import (
    7. "fmt"
    8. "sync"
    9. "sync/atomic"
    10. "time"
    11. )
    12. import (
    13. "github.com/AlexStocks/goext/container/xorlist"
    14. "github.com/AlexStocks/goext/log"
    15. )
    16. var (
    17. ErrTimeChannelFull = fmt.Errorf("timer channel full")
    18. ErrTimeChannelClosed = fmt.Errorf("timer channel closed")
    19. )
    20. // to init a default timer wheel
    21. func Init() {
    22. defaultTimerWheelOnce.Do(func() {
    23. defaultTimerWheel = NewTimerWheel()
    24. })
    25. }
    26. func Now() time.Time {
    27. return defaultTimerWheel.Now()
    28. }
    29. ////////////////////////////////////////////////
    30. // timer node
    31. ////////////////////////////////////////////////
    32. var (
    33. defaultTimerWheelOnce sync.Once
    34. defaultTimerWheel *TimerWheel
    35. nextID TimerID
    36. curGxTime = time.Now().UnixNano() // current goext time in nanoseconds
    37. )
    38. const (
    39. maxMS = 1000
    40. maxSecond = 60
    41. maxMinute = 60
    42. maxHour = 24
    43. maxDay = 31
    44. // ticker interval不能设置到这种精度,
    45. // 实际运行时ticker的时间间隔会在1.001ms上下浮动,
    46. // 当ticker interval小于1ms的时候,会导致TimerWheel.hand
    47. // 和timeWheel.inc不增长,造成时间错乱:例如本来
    48. // 1.5s运行的函数在持续2.1s之后才被执行
    49. // minDiff = 1.001 * MS
    50. minDiff = 10e6
    51. maxTimerLevel = 5
    52. )
    53. func msNum(expire int64) int64 { return expire / int64(time.Millisecond) }
    54. func secondNum(expire int64) int64 { return expire / int64(time.Minute) }
    55. func minuteNum(expire int64) int64 { return expire / int64(time.Minute) }
    56. func hourNum(expire int64) int64 { return expire / int64(time.Hour) }
    57. func dayNum(expire int64) int64 { return expire / (maxHour * int64(time.Hour)) }
    58. // if the return error is not nil, the related timer will be closed.
    59. type TimerFunc func(ID TimerID, expire time.Time, arg interface{}) error
    60. type TimerID = uint64
    61. type timerNode struct {
    62. ID TimerID
    63. trig int64
    64. typ TimerType
    65. period int64
    66. timerRun TimerFunc
    67. arg interface{}
    68. }
    69. func newTimerNode(f TimerFunc, typ TimerType, period int64, arg interface{}) timerNode {
    70. return timerNode{
    71. ID: atomic.AddUint64(&nextID, 1),
    72. trig: atomic.LoadInt64(&curGxTime) + period,
    73. typ: typ,
    74. period: period,
    75. timerRun: f,
    76. arg: arg,
    77. }
    78. }
    79. func compareTimerNode(first, second timerNode) int {
    80. var ret int
    81. if first.trig < second.trig {
    82. ret = -1
    83. } else if first.trig > second.trig {
    84. ret = 1
    85. } else {
    86. ret = 0
    87. }
    88. return ret
    89. }
    90. type timerAction = int64
    91. const (
    92. ADD_TIMER timerAction = 1
    93. DEL_TIMER timerAction = 2
    94. RESET_TIMER timerAction = 3
    95. )
    96. type timerNodeAction struct {
    97. node timerNode
    98. action timerAction
    99. }
    100. ////////////////////////////////////////////////
    101. // timer wheel
    102. ////////////////////////////////////////////////
    103. const (
    104. timerNodeQueueSize = 128
    105. )
    106. var (
    107. limit = [maxTimerLevel + 1]int64{maxMS, maxSecond, maxMinute, maxHour, maxDay}
    108. msLimit = [maxTimerLevel + 1]int64{
    109. int64(time.Millisecond),
    110. int64(time.Second),
    111. int64(time.Minute),
    112. int64(time.Hour),
    113. int64(maxHour * time.Hour),
    114. }
    115. )
    116. // timer based on multiple wheels
    117. type TimerWheel struct {
    118. start int64 // start clock
    119. clock int64 // current time in nanosecond
    120. number int64 // timer node number
    121. hand [maxTimerLevel]int64 // clock
    122. slot [maxTimerLevel]*gxxorlist.XorList // timer list
    123. timerQ chan timerNodeAction
    124. once sync.Once // for close ticker
    125. ticker *time.Ticker
    126. wg sync.WaitGroup
    127. }
    128. func NewTimerWheel() *TimerWheel {
    129. w := &TimerWheel{
    130. clock: atomic.LoadInt64(&curGxTime),
    131. ticker: time.NewTicker(time.Duration(minDiff)), // 这个精度如果太低,会影响curGxTime,进而影响timerNode的trig的值
    132. timerQ: make(chan timerNodeAction, timerNodeQueueSize),
    133. }
    134. w.start = w.clock
    135. for i := 0; i < maxTimerLevel; i++ {
    136. w.slot[i] = gxxorlist.New()
    137. }
    138. w.wg.Add(1)
    139. go func() {
    140. defer w.wg.Done()
    141. var (
    142. t time.Time
    143. cFlag bool
    144. nodeAction timerNodeAction
    145. qFlag bool
    146. )
    147. LOOP:
    148. for {
    149. select {
    150. case t, cFlag = <-w.ticker.C:
    151. atomic.StoreInt64(&curGxTime, t.UnixNano())
    152. if cFlag && 0 != atomic.LoadInt64(&w.number) {
    153. ret := w.timerUpdate(t)
    154. if ret == 0 {
    155. w.run()
    156. }
    157. continue
    158. }
    159. break LOOP
    160. case nodeAction, qFlag = <-w.timerQ:
    161. // 此处只用一个channel,保证对同一个timer操作的顺序性
    162. if qFlag {
    163. switch {
    164. case nodeAction.action == ADD_TIMER:
    165. atomic.AddInt64(&w.number, 1)
    166. w.insertTimerNode(nodeAction.node)
    167. case nodeAction.action == DEL_TIMER:
    168. atomic.AddInt64(&w.number, -1)
    169. w.deleteTimerNode(nodeAction.node)
    170. case nodeAction.action == RESET_TIMER:
    171. // gxlog.CInfo("node action:%#v", nodeAction)
    172. w.resetTimerNode(nodeAction.node)
    173. default:
    174. atomic.AddInt64(&w.number, 1)
    175. w.insertTimerNode(nodeAction.node)
    176. }
    177. continue
    178. }
    179. break LOOP
    180. }
    181. }
    182. }()
    183. return w
    184. }
    185. func (w *TimerWheel) output() {
    186. for idx := range w.slot {
    187. gxlog.CDebug("print slot %d\n", idx)
    188. w.slot[idx].Output()
    189. }
    190. }
    191. func (w *TimerWheel) TimerNumber() int {
    192. return int(atomic.LoadInt64(&w.number))
    193. }
    194. func (w *TimerWheel) Now() time.Time {
    195. return UnixNano2Time(atomic.LoadInt64(&curGxTime))
    196. }
    197. func (w *TimerWheel) run() {
    198. var (
    199. clock int64
    200. err error
    201. node timerNode
    202. slot *gxxorlist.XorList
    203. array []timerNode
    204. )
    205. slot = w.slot[0]
    206. clock = atomic.LoadInt64(&w.clock)
    207. for e, p := slot.Front(); e != nil; p, e = e, e.Next(p) {
    208. node = e.Value.(timerNode)
    209. if clock < node.trig {
    210. break
    211. }
    212. err = node.timerRun(node.ID, UnixNano2Time(clock), node.arg)
    213. if err == nil && node.typ == eTimerLoop {
    214. array = append(array, node)
    215. // w.insertTimerNode(node)
    216. } else {
    217. atomic.AddInt64(&w.number, -1)
    218. }
    219. temp := e
    220. e, p = p, p.Prev(e)
    221. slot.Remove(temp)
    222. }
    223. for idx := range array[:] {
    224. array[idx].trig += array[idx].period
    225. w.insertTimerNode(array[idx])
    226. }
    227. }
    228. func (w *TimerWheel) insertSlot(idx int, node timerNode) {
    229. var (
    230. pos *gxxorlist.XorElement
    231. slot *gxxorlist.XorList
    232. )
    233. slot = w.slot[idx]
    234. for e, p := slot.Front(); e != nil; p, e = e, e.Next(p) {
    235. if compareTimerNode(node, e.Value.(timerNode)) < 0 {
    236. pos = e
    237. break
    238. }
    239. }
    240. if pos != nil {
    241. slot.InsertBefore(node, pos)
    242. } else {
    243. // if slot is empty or @node_ptr is the maximum node
    244. // in slot, insert it at the last of slot
    245. slot.PushBack(node)
    246. }
    247. }
    248. func (w *TimerWheel) deleteTimerNode(node timerNode) {
    249. var (
    250. level int
    251. )
    252. LOOP:
    253. for level, _ = range w.slot[:] {
    254. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    255. if e.Value.(timerNode).ID == node.ID {
    256. w.slot[level].Remove(e)
    257. // atomic.AddInt64(&w.number, -1)
    258. break LOOP
    259. }
    260. }
    261. }
    262. }
    263. func (w *TimerWheel) resetTimerNode(node timerNode) {
    264. var (
    265. level int
    266. )
    267. LOOP:
    268. for level, _ = range w.slot[:] {
    269. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    270. if e.Value.(timerNode).ID == node.ID {
    271. n := e.Value.(timerNode)
    272. n.trig -= n.period
    273. n.period = node.period
    274. n.trig += n.period
    275. w.slot[level].Remove(e)
    276. w.insertTimerNode(n)
    277. break LOOP
    278. }
    279. }
    280. }
    281. }
    282. func (w *TimerWheel) deltaDiff(clock int64) int64 {
    283. var (
    284. handTime int64
    285. )
    286. for idx, hand := range w.hand[:] {
    287. handTime += hand * msLimit[idx]
    288. }
    289. return clock - w.start - handTime
    290. }
    291. func (w *TimerWheel) insertTimerNode(node timerNode) {
    292. var (
    293. idx int
    294. diff int64
    295. )
    296. diff = node.trig - atomic.LoadInt64(&w.clock)
    297. switch {
    298. case diff <= 0:
    299. idx = 0
    300. case dayNum(diff) != 0:
    301. idx = 4
    302. case hourNum(diff) != 0:
    303. idx = 3
    304. case minuteNum(diff) != 0:
    305. idx = 2
    306. case secondNum(diff) != 0:
    307. idx = 1
    308. default:
    309. idx = 0
    310. }
    311. w.insertSlot(idx, node)
    312. }
    313. func (w *TimerWheel) timerCascade(level int) {
    314. var (
    315. guard bool
    316. clock int64
    317. diff int64
    318. cur timerNode
    319. )
    320. clock = atomic.LoadInt64(&w.clock)
    321. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    322. cur = e.Value.(timerNode)
    323. diff = cur.trig - clock
    324. switch {
    325. case cur.trig <= clock:
    326. guard = false
    327. case level == 1:
    328. guard = secondNum(diff) > 0
    329. case level == 2:
    330. guard = minuteNum(diff) > 0
    331. case level == 3:
    332. guard = hourNum(diff) > 0
    333. case level == 4:
    334. guard = dayNum(diff) > 0
    335. }
    336. if guard {
    337. break
    338. }
    339. temp := e
    340. e, p = p, p.Prev(e)
    341. w.slot[level].Remove(temp)
    342. w.insertTimerNode(cur)
    343. }
    344. }
    345. func (w *TimerWheel) timerUpdate(curTime time.Time) int {
    346. var (
    347. clock int64
    348. now int64
    349. idx int32
    350. diff int64
    351. maxIdx int32
    352. inc [maxTimerLevel + 1]int64
    353. )
    354. now = curTime.UnixNano()
    355. clock = atomic.LoadInt64(&w.clock)
    356. diff = now - clock
    357. diff += w.deltaDiff(clock)
    358. if diff < minDiff*0.7 {
    359. return -1
    360. }
    361. atomic.StoreInt64(&w.clock, now)
    362. for idx = maxTimerLevel - 1; 0 <= idx; idx-- {
    363. inc[idx] = diff / msLimit[idx]
    364. diff %= msLimit[idx]
    365. }
    366. maxIdx = 0
    367. for idx = 0; idx < maxTimerLevel; idx++ {
    368. if 0 != inc[idx] {
    369. w.hand[idx] += inc[idx]
    370. inc[idx+1] += w.hand[idx] / limit[idx]
    371. w.hand[idx] %= limit[idx]
    372. maxIdx = idx + 1
    373. }
    374. }
    375. for idx = 1; idx < maxIdx; idx++ {
    376. w.timerCascade(int(idx))
    377. }
    378. return 0
    379. }
    380. func (w *TimerWheel) Stop() {
    381. w.once.Do(func() {
    382. close(w.timerQ)
    383. w.ticker.Stop()
    384. w.timerQ = nil
    385. })
    386. }
    387. func (w *TimerWheel) Close() {
    388. w.Stop()
    389. w.wg.Wait()
    390. }
    391. ////////////////////////////////////////////////
    392. // timer
    393. ////////////////////////////////////////////////
    394. type TimerType int32
    395. const (
    396. eTimerOnce TimerType = 0X1 << 0
    397. eTimerLoop TimerType = 0X1 << 1
    398. )
    399. // 异步通知timerWheel添加一个timer,有可能失败
    400. func (w *TimerWheel) AddTimer(f TimerFunc, typ TimerType, period int64, arg interface{}) (*Timer, error) {
    401. if w.timerQ == nil {
    402. return nil, ErrTimeChannelClosed
    403. }
    404. t := &Timer{w: w}
    405. node := newTimerNode(f, typ, period, arg)
    406. select {
    407. case w.timerQ <- timerNodeAction{node: node, action: ADD_TIMER}:
    408. t.ID = node.ID
    409. return t, nil
    410. default:
    411. }
    412. return nil, ErrTimeChannelFull
    413. }
    414. func (w *TimerWheel) deleteTimer(t *Timer) error {
    415. if w.timerQ == nil {
    416. return ErrTimeChannelClosed
    417. }
    418. select {
    419. case w.timerQ <- timerNodeAction{action: DEL_TIMER, node: timerNode{ID: t.ID}}:
    420. return nil
    421. default:
    422. }
    423. return ErrTimeChannelFull
    424. }
    425. func (w *TimerWheel) resetTimer(t *Timer, d time.Duration) error {
    426. if w.timerQ == nil {
    427. return ErrTimeChannelClosed
    428. }
    429. select {
    430. case w.timerQ <- timerNodeAction{action: RESET_TIMER, node: timerNode{ID: t.ID, period: int64(d)}}:
    431. return nil
    432. default:
    433. }
    434. return ErrTimeChannelFull
    435. }
    436. func sendTime(_ TimerID, t time.Time, arg interface{}) error {
    437. select {
    438. case arg.(chan time.Time) <- t:
    439. default:
    440. // gxlog.CInfo("sendTime default")
    441. }
    442. return nil
    443. }
    444. func (w *TimerWheel) NewTimer(d time.Duration) *Timer {
    445. c := make(chan time.Time, 1)
    446. t := &Timer{
    447. C: c,
    448. }
    449. timer, err := w.AddTimer(sendTime, eTimerOnce, int64(d), c)
    450. if err == nil {
    451. t.ID = timer.ID
    452. t.w = timer.w
    453. return t
    454. }
    455. close(c)
    456. return nil
    457. }
    458. func (w *TimerWheel) After(d time.Duration) <-chan time.Time {
    459. //timer := defaultTimer.NewTimer(d)
    460. //if timer == nil {
    461. // return nil
    462. //}
    463. //
    464. //return timer.C
    465. return w.NewTimer(d).C
    466. }
    467. func goFunc(_ TimerID, _ time.Time, arg interface{}) error {
    468. go arg.(func())()
    469. return nil
    470. }
    471. func (w *TimerWheel) AfterFunc(d time.Duration, f func()) *Timer {
    472. t, _ := w.AddTimer(goFunc, eTimerOnce, int64(d), f)
    473. return t
    474. }
    475. func (w *TimerWheel) Sleep(d time.Duration) {
    476. <-w.NewTimer(d).C
    477. }
    478. ////////////////////////////////////////////////
    479. // ticker
    480. ////////////////////////////////////////////////
    481. func (w *TimerWheel) NewTicker(d time.Duration) *Ticker {
    482. c := make(chan time.Time, 1)
    483. timer, err := w.AddTimer(sendTime, eTimerLoop, int64(d), c)
    484. if err == nil {
    485. timer.C = c
    486. return (*Ticker)(timer)
    487. }
    488. close(c)
    489. return nil
    490. }
    491. func (w *TimerWheel) TickFunc(d time.Duration, f func()) *Ticker {
    492. t, err := w.AddTimer(goFunc, eTimerLoop, int64(d), f)
    493. if err == nil {
    494. return (*Ticker)(t)
    495. }
    496. return nil
    497. }
    498. func (w *TimerWheel) Tick(d time.Duration) <-chan time.Time {
    499. return w.NewTicker(d).C
    500. }
    1. // Copyright 2016 ~ 2018 AlexStocks(https://github.com/AlexStocks).
    2. // All rights reserved. Use of this source code is
    3. // governed by Apache License 2.0.
    4. // Package gxtime encapsulates some golang.time functions
    5. package gxtime
    6. import (
    7. "fmt"
    8. "sync"
    9. "sync/atomic"
    10. "time"
    11. )
    12. import (
    13. "github.com/AlexStocks/goext/container/xorlist"
    14. "github.com/AlexStocks/goext/log"
    15. )
    16. var (
    17. ErrTimeChannelFull = fmt.Errorf("timer channel full")
    18. ErrTimeChannelClosed = fmt.Errorf("timer channel closed")
    19. )
    20. // to init a default timer wheel
    21. func Init() {
    22. defaultTimerWheelOnce.Do(func() {
    23. defaultTimerWheel = NewTimerWheel()
    24. })
    25. }
    26. func Now() time.Time {
    27. return defaultTimerWheel.Now()
    28. }
    29. ////////////////////////////////////////////////
    30. // timer node
    31. ////////////////////////////////////////////////
    32. var (
    33. defaultTimerWheelOnce sync.Once
    34. defaultTimerWheel *TimerWheel
    35. nextID TimerID
    36. curGxTime = time.Now().UnixNano() // current goext time in nanoseconds
    37. )
    38. const (
    39. maxMS = 1000
    40. maxSecond = 60
    41. maxMinute = 60
    42. maxHour = 24
    43. maxDay = 31
    44. // ticker interval不能设置到这种精度,
    45. // 实际运行时ticker的时间间隔会在1.001ms上下浮动,
    46. // 当ticker interval小于1ms的时候,会导致TimerWheel.hand
    47. // 和timeWheel.inc不增长,造成时间错乱:例如本来
    48. // 1.5s运行的函数在持续2.1s之后才被执行
    49. // minDiff = 1.001 * MS
    50. minDiff = 10e6
    51. maxTimerLevel = 5
    52. )
    53. func msNum(expire int64) int64 { return expire / int64(time.Millisecond) }
    54. func secondNum(expire int64) int64 { return expire / int64(time.Minute) }
    55. func minuteNum(expire int64) int64 { return expire / int64(time.Minute) }
    56. func hourNum(expire int64) int64 { return expire / int64(time.Hour) }
    57. func dayNum(expire int64) int64 { return expire / (maxHour * int64(time.Hour)) }
    58. // if the return error is not nil, the related timer will be closed.
    59. type TimerFunc func(ID TimerID, expire time.Time, arg interface{}) error
    60. type TimerID = uint64
    61. type timerNode struct {
    62. ID TimerID
    63. trig int64
    64. typ TimerType
    65. period int64
    66. timerRun TimerFunc
    67. arg interface{}
    68. }
    69. func newTimerNode(f TimerFunc, typ TimerType, period int64, arg interface{}) timerNode {
    70. return timerNode{
    71. ID: atomic.AddUint64(&nextID, 1),
    72. trig: atomic.LoadInt64(&curGxTime) + period,
    73. typ: typ,
    74. period: period,
    75. timerRun: f,
    76. arg: arg,
    77. }
    78. }
    79. func compareTimerNode(first, second timerNode) int {
    80. var ret int
    81. if first.trig < second.trig {
    82. ret = -1
    83. } else if first.trig > second.trig {
    84. ret = 1
    85. } else {
    86. ret = 0
    87. }
    88. return ret
    89. }
    90. type timerAction = int64
    91. const (
    92. ADD_TIMER timerAction = 1
    93. DEL_TIMER timerAction = 2
    94. RESET_TIMER timerAction = 3
    95. )
    96. type timerNodeAction struct {
    97. node timerNode
    98. action timerAction
    99. }
    100. ////////////////////////////////////////////////
    101. // timer wheel
    102. ////////////////////////////////////////////////
    103. const (
    104. timerNodeQueueSize = 128
    105. )
    106. var (
    107. limit = [maxTimerLevel + 1]int64{maxMS, maxSecond, maxMinute, maxHour, maxDay}
    108. msLimit = [maxTimerLevel + 1]int64{
    109. int64(time.Millisecond),
    110. int64(time.Second),
    111. int64(time.Minute),
    112. int64(time.Hour),
    113. int64(maxHour * time.Hour),
    114. }
    115. )
    116. // timer based on multiple wheels
    117. type TimerWheel struct {
    118. start int64 // start clock
    119. clock int64 // current time in nanosecond
    120. number int64 // timer node number
    121. hand [maxTimerLevel]int64 // clock
    122. slot [maxTimerLevel]*gxxorlist.XorList // timer list
    123. timerQ chan timerNodeAction
    124. once sync.Once // for close ticker
    125. ticker *time.Ticker
    126. wg sync.WaitGroup
    127. }
    128. func NewTimerWheel() *TimerWheel {
    129. w := &TimerWheel{
    130. clock: atomic.LoadInt64(&curGxTime),
    131. ticker: time.NewTicker(time.Duration(minDiff)), // 这个精度如果太低,会影响curGxTime,进而影响timerNode的trig的值
    132. timerQ: make(chan timerNodeAction, timerNodeQueueSize),
    133. }
    134. w.start = w.clock
    135. for i := 0; i < maxTimerLevel; i++ {
    136. w.slot[i] = gxxorlist.New()
    137. }
    138. w.wg.Add(1)
    139. go func() {
    140. defer w.wg.Done()
    141. var (
    142. t time.Time
    143. cFlag bool
    144. nodeAction timerNodeAction
    145. qFlag bool
    146. )
    147. LOOP:
    148. for {
    149. select {
    150. case t, cFlag = <-w.ticker.C:
    151. atomic.StoreInt64(&curGxTime, t.UnixNano())
    152. if cFlag && 0 != atomic.LoadInt64(&w.number) {
    153. ret := w.timerUpdate(t)
    154. if ret == 0 {
    155. w.run()
    156. }
    157. continue
    158. }
    159. break LOOP
    160. case nodeAction, qFlag = <-w.timerQ:
    161. // 此处只用一个channel,保证对同一个timer操作的顺序性
    162. if qFlag {
    163. switch {
    164. case nodeAction.action == ADD_TIMER:
    165. atomic.AddInt64(&w.number, 1)
    166. w.insertTimerNode(nodeAction.node)
    167. case nodeAction.action == DEL_TIMER:
    168. atomic.AddInt64(&w.number, -1)
    169. w.deleteTimerNode(nodeAction.node)
    170. case nodeAction.action == RESET_TIMER:
    171. // gxlog.CInfo("node action:%#v", nodeAction)
    172. w.resetTimerNode(nodeAction.node)
    173. default:
    174. atomic.AddInt64(&w.number, 1)
    175. w.insertTimerNode(nodeAction.node)
    176. }
    177. continue
    178. }
    179. break LOOP
    180. }
    181. }
    182. }()
    183. return w
    184. }
    185. func (w *TimerWheel) output() {
    186. for idx := range w.slot {
    187. gxlog.CDebug("print slot %d\n", idx)
    188. w.slot[idx].Output()
    189. }
    190. }
    191. func (w *TimerWheel) TimerNumber() int {
    192. return int(atomic.LoadInt64(&w.number))
    193. }
    194. func (w *TimerWheel) Now() time.Time {
    195. return UnixNano2Time(atomic.LoadInt64(&curGxTime))
    196. }
    197. func (w *TimerWheel) run() {
    198. var (
    199. clock int64
    200. err error
    201. node timerNode
    202. slot *gxxorlist.XorList
    203. array []timerNode
    204. )
    205. slot = w.slot[0]
    206. clock = atomic.LoadInt64(&w.clock)
    207. for e, p := slot.Front(); e != nil; p, e = e, e.Next(p) {
    208. node = e.Value.(timerNode)
    209. if clock < node.trig {
    210. break
    211. }
    212. err = node.timerRun(node.ID, UnixNano2Time(clock), node.arg)
    213. if err == nil && node.typ == eTimerLoop {
    214. array = append(array, node)
    215. // w.insertTimerNode(node)
    216. } else {
    217. atomic.AddInt64(&w.number, -1)
    218. }
    219. temp := e
    220. e, p = p, p.Prev(e)
    221. slot.Remove(temp)
    222. }
    223. for idx := range array[:] {
    224. array[idx].trig += array[idx].period
    225. w.insertTimerNode(array[idx])
    226. }
    227. }
    228. func (w *TimerWheel) insertSlot(idx int, node timerNode) {
    229. var (
    230. pos *gxxorlist.XorElement
    231. slot *gxxorlist.XorList
    232. )
    233. slot = w.slot[idx]
    234. for e, p := slot.Front(); e != nil; p, e = e, e.Next(p) {
    235. if compareTimerNode(node, e.Value.(timerNode)) < 0 {
    236. pos = e
    237. break
    238. }
    239. }
    240. if pos != nil {
    241. slot.InsertBefore(node, pos)
    242. } else {
    243. // if slot is empty or @node_ptr is the maximum node
    244. // in slot, insert it at the last of slot
    245. slot.PushBack(node)
    246. }
    247. }
    248. func (w *TimerWheel) deleteTimerNode(node timerNode) {
    249. var (
    250. level int
    251. )
    252. LOOP:
    253. for level, _ = range w.slot[:] {
    254. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    255. if e.Value.(timerNode).ID == node.ID {
    256. w.slot[level].Remove(e)
    257. // atomic.AddInt64(&w.number, -1)
    258. break LOOP
    259. }
    260. }
    261. }
    262. }
    263. func (w *TimerWheel) resetTimerNode(node timerNode) {
    264. var (
    265. level int
    266. )
    267. LOOP:
    268. for level, _ = range w.slot[:] {
    269. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    270. if e.Value.(timerNode).ID == node.ID {
    271. n := e.Value.(timerNode)
    272. n.trig -= n.period
    273. n.period = node.period
    274. n.trig += n.period
    275. w.slot[level].Remove(e)
    276. w.insertTimerNode(n)
    277. break LOOP
    278. }
    279. }
    280. }
    281. }
    282. func (w *TimerWheel) deltaDiff(clock int64) int64 {
    283. var (
    284. handTime int64
    285. )
    286. for idx, hand := range w.hand[:] {
    287. handTime += hand * msLimit[idx]
    288. }
    289. return clock - w.start - handTime
    290. }
    291. func (w *TimerWheel) insertTimerNode(node timerNode) {
    292. var (
    293. idx int
    294. diff int64
    295. )
    296. diff = node.trig - atomic.LoadInt64(&w.clock)
    297. switch {
    298. case diff <= 0:
    299. idx = 0
    300. case dayNum(diff) != 0:
    301. idx = 4
    302. case hourNum(diff) != 0:
    303. idx = 3
    304. case minuteNum(diff) != 0:
    305. idx = 2
    306. case secondNum(diff) != 0:
    307. idx = 1
    308. default:
    309. idx = 0
    310. }
    311. w.insertSlot(idx, node)
    312. }
    313. func (w *TimerWheel) timerCascade(level int) {
    314. var (
    315. guard bool
    316. clock int64
    317. diff int64
    318. cur timerNode
    319. )
    320. clock = atomic.LoadInt64(&w.clock)
    321. for e, p := w.slot[level].Front(); e != nil; p, e = e, e.Next(p) {
    322. cur = e.Value.(timerNode)
    323. diff = cur.trig - clock
    324. switch {
    325. case cur.trig <= clock:
    326. guard = false
    327. case level == 1:
    328. guard = secondNum(diff) > 0
    329. case level == 2:
    330. guard = minuteNum(diff) > 0
    331. case level == 3:
    332. guard = hourNum(diff) > 0
    333. case level == 4:
    334. guard = dayNum(diff) > 0
    335. }
    336. if guard {
    337. break
    338. }
    339. temp := e
    340. e, p = p, p.Prev(e)
    341. w.slot[level].Remove(temp)
    342. w.insertTimerNode(cur)
    343. }
    344. }
    345. func (w *TimerWheel) timerUpdate(curTime time.Time) int {
    346. var (
    347. clock int64
    348. now int64
    349. idx int32
    350. diff int64
    351. maxIdx int32
    352. inc [maxTimerLevel + 1]int64
    353. )
    354. now = curTime.UnixNano()
    355. clock = atomic.LoadInt64(&w.clock)
    356. diff = now - clock
    357. diff += w.deltaDiff(clock)
    358. if diff < minDiff*0.7 {
    359. return -1
    360. }
    361. atomic.StoreInt64(&w.clock, now)
    362. for idx = maxTimerLevel - 1; 0 <= idx; idx-- {
    363. inc[idx] = diff / msLimit[idx]
    364. diff %= msLimit[idx]
    365. }
    366. maxIdx = 0
    367. for idx = 0; idx < maxTimerLevel; idx++ {
    368. if 0 != inc[idx] {
    369. w.hand[idx] += inc[idx]
    370. inc[idx+1] += w.hand[idx] / limit[idx]
    371. w.hand[idx] %= limit[idx]
    372. maxIdx = idx + 1
    373. }
    374. }
    375. for idx = 1; idx < maxIdx; idx++ {
    376. w.timerCascade(int(idx))
    377. }
    378. return 0
    379. }
    380. func (w *TimerWheel) Stop() {
    381. w.once.Do(func() {
    382. close(w.timerQ)
    383. w.ticker.Stop()
    384. w.timerQ = nil
    385. })
    386. }
    387. func (w *TimerWheel) Close() {
    388. w.Stop()
    389. w.wg.Wait()
    390. }
    391. ////////////////////////////////////////////////
    392. // timer
    393. ////////////////////////////////////////////////
    394. type TimerType int32
    395. const (
    396. eTimerOnce TimerType = 0X1 << 0
    397. eTimerLoop TimerType = 0X1 << 1
    398. )
    399. // 异步通知timerWheel添加一个timer,有可能失败
    400. func (w *TimerWheel) AddTimer(f TimerFunc, typ TimerType, period int64, arg interface{}) (*Timer, error) {
    401. if w.timerQ == nil {
    402. return nil, ErrTimeChannelClosed
    403. }
    404. t := &Timer{w: w}
    405. node := newTimerNode(f, typ, period, arg)
    406. select {
    407. case w.timerQ <- timerNodeAction{node: node, action: ADD_TIMER}:
    408. t.ID = node.ID
    409. return t, nil
    410. default:
    411. }
    412. return nil, ErrTimeChannelFull
    413. }
    414. func (w *TimerWheel) deleteTimer(t *Timer) error {
    415. if w.timerQ == nil {
    416. return ErrTimeChannelClosed
    417. }
    418. select {
    419. case w.timerQ <- timerNodeAction{action: DEL_TIMER, node: timerNode{ID: t.ID}}:
    420. return nil
    421. default:
    422. }
    423. return ErrTimeChannelFull
    424. }
    425. func (w *TimerWheel) resetTimer(t *Timer, d time.Duration) error {
    426. if w.timerQ == nil {
    427. return ErrTimeChannelClosed
    428. }
    429. select {
    430. case w.timerQ <- timerNodeAction{action: RESET_TIMER, node: timerNode{ID: t.ID, period: int64(d)}}:
    431. return nil
    432. default:
    433. }
    434. return ErrTimeChannelFull
    435. }
    436. func sendTime(_ TimerID, t time.Time, arg interface{}) error {
    437. select {
    438. case arg.(chan time.Time) <- t:
    439. default:
    440. // gxlog.CInfo("sendTime default")
    441. }
    442. return nil
    443. }
    444. func (w *TimerWheel) NewTimer(d time.Duration) *Timer {
    445. c := make(chan time.Time, 1)
    446. t := &Timer{
    447. C: c,
    448. }
    449. timer, err := w.AddTimer(sendTime, eTimerOnce, int64(d), c)
    450. if err == nil {
    451. t.ID = timer.ID
    452. t.w = timer.w
    453. return t
    454. }
    455. close(c)
    456. return nil
    457. }
    458. func (w *TimerWheel) After(d time.Duration) <-chan time.Time {
    459. //timer := defaultTimer.NewTimer(d)
    460. //if timer == nil {
    461. // return nil
    462. //}
    463. //
    464. //return timer.C
    465. return w.NewTimer(d).C
    466. }
    467. func goFunc(_ TimerID, _ time.Time, arg interface{}) error {
    468. go arg.(func())()
    469. return nil
    470. }
    471. func (w *TimerWheel) AfterFunc(d time.Duration, f func()) *Timer {
    472. t, _ := w.AddTimer(goFunc, eTimerOnce, int64(d), f)
    473. return t
    474. }
    475. func (w *TimerWheel) Sleep(d time.Duration) {
    476. <-w.NewTimer(d).C
    477. }
    478. ////////////////////////////////////////////////
    479. // ticker
    480. ////////////////////////////////////////////////
    481. func (w *TimerWheel) NewTicker(d time.Duration) *Ticker {
    482. c := make(chan time.Time, 1)
    483. timer, err := w.AddTimer(sendTime, eTimerLoop, int64(d), c)
    484. if err == nil {
    485. timer.C = c
    486. return (*Ticker)(timer)
    487. }
    488. close(c)
    489. return nil
    490. }
    491. func (w *TimerWheel) TickFunc(d time.Duration, f func()) *Ticker {
    492. t, err := w.AddTimer(goFunc, eTimerLoop, int64(d), f)
    493. if err == nil {
    494. return (*Ticker)(t)
    495. }
    496. return nil
    497. }
    498. func (w *TimerWheel) Tick(d time.Duration) <-chan time.Time {
    499. return w.NewTicker(d).C
    500. }