概述

时间轮的设计思想来源于时钟。
时钟由秒针、分针和时针组成,分别对应第一层时间轮、第二层时间轮和第三层时间轮。当然,你也根据实际需要增加或减少时间轮的层数。针盘上面有刻度,秒针、分针对刻度理解可不一样,对于秒针而言,一个刻度表示 1S,而对于分针而言,一个刻度表示 1 分钟,而对于时针,一个刻度是 0.2 小时。
将其映射至多层时间轮,时间轮都拥有相同的刻度数量,称为 wheelSize(20),默认是 20 个刻度,每个刻度大小称为 tickMs,不同层的 tickMs 是不同的,比如第一层 tickMs = 20ms,第二层为 tickMs = 20*20 = 400ms。每一层也有一个 currentTime 表示当前层的 “此刻”,通过与 currentTime 比较判断是否需要执行任务或对任务进行降级操作。相关示意图如下图所示:
多层时间轮.png
第一层时间轮都会有时间范围:
Kafka 多层时间轮范围.png
每一个定时任务拥有两种时间,分别是延时时间(delayTime)过期时间(ExpiredTime)。比如我们向时间轮添加一个 3S 后执行的任务,当前时间为 2S,那么这个任务所对应的延迟时间和过期时间分别是 3S 和 5S。

添加定时任务

首先,每一个定时任务会有两个时间,一个是延迟时间(delayTime),另一个过期时间(ExpiredTime)。比如当前时间是 10s,我们添加一个延迟时间为 2s 的任务,那么这个任务的过期时间就是 10 + 2 = 12s。
此时,我们需要添加一个延迟时间为 350ms 的任务,当前时间为 0ms。那么它应该被添加到哪一层的哪一个 Bucket 呢? 我们可以通过以下计算步骤得到:

  1. 定位哪一层时间轮。第一层时间轮范围是 [0, 20ms),不适合,第二层时间轮时间范围是 [20, 400ms),包含 350ms 的延迟任务,所以选中第二层时间轮。
  2. 定位哪个 Bucket。通过以下公式计算可得存放的 ID 位置是第 17 个 bucket。
    1. virtualId = delayTime(350) / tickMs(20) = 17
    2. bucketId = virtualId % wheelSize(20) = 17

    任务的降级处理

    场景:在时刻 0 添加一个 450ms 的延迟任务 A,经过上面的计算会被添加到第三层的第 0 个 Bucket 中。随着时间流逝,已经过去 400ms。那么任务 A 只需等待 50ms 就可以被执行了。显然,50ms 的延迟任务此刻不应该存在第三层时间轮。因此对它进行降级操作。它会重新以 50ms 的延迟任务重新放回到时间轮,经过计算得到位置是第二层的第 0 个 Bucket。
    再经过 40ms,又会引起降级操作,它会重新以 10ms 的延迟任务重新放回到时间轮,经过计算得到位置是第一层的第 2 个 Bucket。
    此时的时刻为为 440ms,只需要再过 10ms 这个任务就可被执行。

    Kafka 实现的时间轮

    前面只是一个大致的讲解过程,并没有涉及到数据结构的实现。还有许多细节需要通过源码的方式来实现。

    单层时间轮的实现 TimingWheel

    单层时间轮是由 kafka.utils.timer.TimingWheel 类实现。先看看重要的入参:
参数名称 说明
tickMs 时钟刻度
wheelSize Bucket 数量
startMs 时间轮对象被创建的起始时间戳
taskCounter 任务读数器
queue: DelayQueue[TimerTaskList] 延迟任务队列,里面存放 Bucket

有 4 个非常重要的内部变量:

  1. @nonthreadsafe
  2. private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  3. /**
  4. * 这一层时间轮总时长。
  5. * Kafka第一层时间轮:tickMs=1, wheelSize=20,interval=20*1=20ms
  6. * Kafka第二层时间轮:tickMs=20,wheelSize=20,interval=20*20=400ms
  7. * Kafka第三层时间轮:tickMs=400,wheelSize=20,interval=400*20=8000ms
  8. */
  9. private[this] val interval = tickMs * wheelSize
  10. /**
  11. * 当前层时间轮下的所有Bucket对象,即所有[[TimerTaskList]]对象
  12. */
  13. private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  14. /**
  15. * 当前时间戳=当前时间的最大滴答时长的整数倍。
  16. * 比如tickMs=20ms,当前时间戳为167ms,则currentTime=160ms
  17. */
  18. private[this] var currentTime = startMs - (startMs % tickMs)
  19. /**
  20. * kafka按需创建上层时间轮。也就是说,如果超出当前层的时间轮的时间范围是不会创建上层时间轮的。
  21. * 如果当前层时间轮无法容纳给定延迟时间,就会创建第二层时间轮,如果第二层也满足不了,则创建第三层,
  22. * 以此类推,直到找到合适的时间轮层。由于每次步骤都是按倍数递增,所以能很快在有限个时间轮范围内找到合适的时间轮存放
  23. * 给定的延迟时间。
  24. */
  25. @volatile private[this] var overflowWheel: TimingWheel = null
  26. // ...
  27. }
参数 说明
interval 本层时间轮时间间隔,第一层时间轮20ms,第二层为 400ms,第三层为 8000ms。
当前层的 interval = 上一层 interval * 20 得到
buckets 一个长度为 20 的数组,存放 TimerTaskList 对象。它是一个双向链表结构,最原始的延迟任务通过对象 TimerTask 包装后添加到链表中,就算完成时间轮入队了
currentTime 当前时间戳,它需要规格化。即currentTime = 当前时间的最大滴答时长的整数倍。比如 tickMs = 20ms,当前时间戳为 167ms,则 currentTime = 167 - (167 % 20) = 160。这方便通过 & 操作符定位 bucket
overflowWheel Kafka 按需创建下一层(层数+1)时间轮。比如第一层时间轮无法满足,则创建第二层时间轮。每一层时间轮持有下一层时间轮的引用。这样,多层时间轮就可以进行联动了。

Bucket 是什么样子

前面也提到过,对象 TimeWheel 的 Bucket 是一个长度为 20 的数组,数组存放 TimerTaskList 类型的实例对象。那我们看看 TimerTaskList 是什么样的结构:

TimerTaskList

这个对象用来表示一个 Bucket,多个 TimerTaskList 构成的一个双向循环列表,就组成 “轮” 状结构了。

  1. /**
  2. * 理解为一个Bucket,每个Bucket实际上是一个双向循环链表
  3. * Bucket是可以重复使用的:通过重新设置[[TimerTaskList.expiration]]就可以完成
  4. *
  5. * @param taskCounter 每个Bucket所含有的定时任务总数量
  6. */
  7. @threadsafe
  8. private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  9. /**
  10. * 构成双向循环链表
  11. */
  12. private[this] val root = new TimerTaskEntry(null, -1)
  13. root.next = root
  14. root.prev = root
  15. /**
  16. * Bucket的过期时间戳(绝对时间戳),Bucket对应表盘中的一格,它是一个范围值:[起始时间, 结束时间]
  17. * 时间间隔=结束时间-起始时间,同一层的Bucket的时间间隔是一样的。
  18. * 如何确定Bucket过期:当前时间越过了Bucket的过期时间,这个Bucket才算是过期的Bucket。
  19. * 过期时间就是指expiration字段
  20. */
  21. private[this] val expiration = new AtomicLong(-1L)
  22. // ...
  23. }
参数 说明
root 双向链表的头结构
expiration Bucket 的过期时间戳(绝对时间戳),Bucket 对应表盘中的一格,它也是一个范围值,比如第一层第 0 个 Bucket 的时间范围是:[0, 20ms)。如果确定 Bucket 过期呢 ? 当前时间超过了 Bucket 的过期时间。而过期时间就是指 expiration 字段。

内部还提供若干方法:

方法名 功能
setExpiration(Long) CAS 修改 expiration 字段值,存在并发
add(TimerTaskEntry) 向 Bucket 添加一个延迟任务
remove(TimerTaskEntry) 从链表中移除一延迟任务
flush(f: TimerTaskEntry=>Unit) 对已过期的 Bucket 上的所有任务执行函数 f。
这里表示时间到了,延迟任务可以被执行

TimerTaskList 是一个双向链表结构,一个 TimerTaskList 表示一个 Bucket,里面提供添加和删除 API,还提供一个重要的 flush(function) API 表示延迟任务的执行。

TimerTaskEntry

双向链表存储的对象是 TimerTaskEntry,这个对象主要的目的就是构造双向链表结构。

  1. /**
  2. * 构成Bucket的子元素:TimerTaskEntry之间构成双向链表
  3. *
  4. * @param timerTask 延时任务
  5. * @param expirationMs 过期时间
  6. */
  7. private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  8. // 绑定的Bucket链表实例
  9. @volatile var list: TimerTaskList = null
  10. // 前序实体指针
  11. var next: TimerTaskEntry = null
  12. // 后序实体指针
  13. var prev: TimerTaskEntry = null
  14. ...
  15. }

TimerTask

TimerTask 才是真正的延迟任务,但它是一个标识接口(trait),里面定义了几个通用的方法和变量:

变量 描述
delayMs 任务延迟时间,通常是指 request.time.ms。可理解为相对时间
TimerTaskEntry 每一个 TimerTask 会被添加到 Bucket,
因此,它需要绑定一个双向链表适配类,即 TimerTaskEntry,只有这个类才有双向链表的结构

相关 API :

方法名称 说明
cancel 取消定时任务,处理方式是将对应的 TimerTaskEntry 置为 null
setTimerTaskEntry 关联 TimerTaskEntry

延迟操作的实现类一般继承 DelayedOperation,但这篇文章不会关注 DelayedOperation 的实现及接口定义,我们可以简单理解:只要实现 TimerTask 接口,就可以被认为是延迟任务,那么任务就可以被添加到时间轮中延迟执行。

如何推进时间轮

那么,Kafka 是如何让时间轮上的指针转动起来的呢 ? 答案在 kafka.utils.timer.Timer 文件中。

Timer 接口

Timer 被 trait 标记,属于接口,相关 API 定义如下:

接口名称 说明
add(TimerTask) 将给定的延迟任务添加到时间轮中,当延迟时间结束后执行任务
advanceClock(Long timeoutMs) 推进时间轮前进。timeoutMs 表示推进到哪个时间点。
size 获取待执行任务数量
shutdown 关闭时间轮

Timer 实现类

SystemTimerTimer 接口唯一的实现类,首先看看它的入参:

入参 说明
executorName 执行器名称
tickMs 时钟刻度,也就是前进一格所花费的时间。默认值:1ms
wheelSize 单层时间轮 Bucket 数量。默认值:20。因此,单层时间轮所拥有的时间范围 = tickMs * wheelSize
startMs 时间轮启动时间

入参核心参数就是 tickMswheelSize,默认第一层时间轮 tickMs = 1, wheelSize = 20,因此,可以存放延迟时间为 [0, 20ms) 的延迟任务。
下面来说说 SystemTimer 的变量:

变量名 说明
taskExecutor 用来执行任务的线程。线程数量默认为 1。
delayQueue[TimerTaskList] 一个无界的 BlockingQueue,其中的对象只能在到期时才能从队列中取走。即按 deadLine 由小到大进行排序。这个队列存放有任务的 Bucket,当 Bucket 超时后,就可以从队列中取出并执行任务。那为什么不直接使用 BlockingQueue 呢 ?
taskCounter 存放在此时间轮的任务数量
timingWheel 创建时间轮对象,将 delayQueue 传时间轮对象
readWriteLock 读写锁
readLock 读锁
writeLock 写锁

我们需要重点关注 TimingWheelDelayQueue 是如何配置使用的。
SystemTimer 也有重要的 API:

函数名称 说明
add(TimerTask) 将延迟任务添加到时间轮中。这里是获取读锁,其原因是在没有线程持有写锁的情况下,多个线程能够同时向时间轮添加定时任务,因为 TimerTaskList 是线程安全的类。
如果添加失败,则直接提交给 taskExecutor 执行。因为有可能已过期
advanceClock(Long timeoutMs) 向前推进时间轮到 timeoutMs。这个 API 需要好好理解,
下面会有详细解释

接下来,我们重点看看 advanceClock(Long) 源码:

  1. // kafka.utils.timer.SystemTimer#advanceClock
  2. /**
  3. * 推进时间向前进,如果没有,阻塞timeoutMs,默认值是200ms。
  4. */
  5. def advanceClock(timeoutMs: Long): Boolean = {
  6. // #1 从java的延迟队列中获取下一个已过期的Bucket对象
  7. var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
  8. if (bucket != null) {
  9. // #2 存在已过期的Bucket,则获取写锁,准备修改时间轮
  10. writeLock.lock()
  11. try {
  12. // 尝试从延迟队列中获取已过期的Bucket对象
  13. while (bucket != null) {
  14. // #3 推动时间轮向前滚动到Bucket的过期时间点
  15. timingWheel.advanceClock(bucket.getExpiration)
  16. // #4 可能有两种情况:
  17. // ① 已到了截止时间,因此执行任务。
  18. // ① 还未到执行时间,重新添加回时间轮中
  19. bucket.flush(addTimerTaskEntry)
  20. // #5 立即读取下一个已过期的Bucket
  21. bucket = delayQueue.poll()
  22. }
  23. } finally {
  24. // 释放写锁
  25. writeLock.unlock()
  26. }
  27. true
  28. } else {
  29. false
  30. }
  31. }

这一步做了什么操作呢 ? 我举下面的例子:
设:时间单位 u=1,每层 bucket 数量 n = 3,起始时间戳 time = c,总共有 3 层时间轮,具体分布如下:

  1. level buckets
  2. 1 [c,c] [c+1,c+1] [c+2,c+2]
  3. 2 [c,c+2] [c+3,c+5] [c+6,c+8]
  4. 3 [c,c+8] [c+9,c+17] [c+18,c+26]

定义 bucket 过期:如果过期时间=bucket 的起始时间,即范围左值,就表示该 Bucket 过期了。
因此,当时间 time=c+1,buckets [c,c], [c,c+2] and [c,c+8] 这三个 bucket 过期了。 因此 level1 的时针(具体是指 currentTime)移动到 c+1,并且创建时间范围为 [c+3,c+3] 的 Bucket 实例。 level2 和 level3 的还是处于 c,因为走过的时间 1 不足以驱动它们前进一格(level2 需要 3,level3 需要 9)。 因此在 level2 和 level3 不会创建新的 Bucket 对象。


注意:level2的bucket[c,c+2]不会存储任何超时任务,因为这个Bucket和level1整个层重复了。
同样,level3的[c,c+8]也不会存放任何超时任务。

这有一点浪费,但能简化实现。

1 [c+1,c+1] [c+2,c+2] [c+3,c+3]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+26]

在time=c+2(时间又tick了一次),[c+1,c+1] 过期,level1的currentTime更新为c+2,
[c+4,c+4] 被创建:
1 [c+2,c+2] [c+3,c+3] [c+4,c+4]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+18]

在time=c+3,[c+2,c+2]过期,
level2的currentTime更新为c+3,并且level1创建[c+5,c+5],level2创建[c+9,c+11],
level3还是没有变化:
1 [c+3,c+3] [c+4,c+4] [c+5,c+5]
2 [c+3,c+5] [c+6,c+8] [c+9,c+11]
3 [c,c+8] [c+9,c+17] [c+8,c+11]

当超时任务在超时之前完成时,分层时间轮特别有效率。
即便在所有任务都超时的情况下,分层时间轮依然具有时间优势:插入和删除的时间复杂度分别是O(m)和O(1)。
而基于优先队列则时间复杂度为O(logN)。

  1. /**
  2. * 向时间轮添加一个延迟任务
  3. * @param timerTaskEntry
  4. * @return
  5. */
  6. def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  7. // #1 获取延迟任务的过期时间
  8. val expiration = timerTaskEntry.expirationMs
  9. // #2 判断任务是否已被取消
  10. if (timerTaskEntry.cancelled) {
  11. // Cancelled
  12. false
  13. } else if (expiration < currentTime + tickMs) {
  14. // #3 任务已经过期,不允许添加(连第一个Bucket都无法容纳,更别说后面的Bucket了)
  15. false
  16. } else if (expiration < currentTime + interval) {
  17. // #4 任务的超时时间在「currentTime, currentTime+interval)范围内
  18. // 说明可以在本层时间轮中找到对应的Bucket
  19. // #4-1 确定Bucket
  20. val virtualId = expiration / tickMs
  21. val bucket = buckets((virtualId % wheelSize.toLong).toInt)
  22. // #4-2 添加延时任务
  23. bucket.add(timerTaskEntry)
  24. // #4-3 设置Bucket的过期时间。
  25. // 通过返回值来确认该Bucket是否是已过期的Bucket。
  26. // 如果设置成功,则返回true,说明这个Bucket已经是过期的,需要重新添加到Bucket
  27. // 如果设置失败,则返回false,说明这个Bucket已经存在队列中,不需要重复放入队列
  28. if (bucket.setExpiration(virtualId * tickMs)) {
  29. // 将Bucket入队
  30. queue.offer(bucket)
  31. }
  32. true
  33. } else {
  34. // 如果过期时间expiration超过本层时间轮,那么递归创建上层时间轮,
  35. // 直到确认合适的时间轮为止
  36. if (overflowWheel == null) addOverflowWheel()
  37. overflowWheel.add(timerTaskEntry)
  38. }
  39. }
  1. /**
  2. * 驱动当前层时间轮向前进
  3. *
  4. * @param timeMs 将本层时间轮向前推进到timeMs,
  5. * 这个值必须超过当前层时间轮的Bucket的范围值,否则本次推进操作将变得毫无意义
  6. */
  7. def advanceClock(timeMs: Long): Unit = {
  8. // #1 推进操作需要timeMs超过Bucket的范围值
  9. if (timeMs >= currentTime + tickMs) {
  10. // 更新 currentTime
  11. currentTime = timeMs - (timeMs % tickMs)
  12. // #2 如果存在上层时间轮,递归推进所有上层的时间轮
  13. // 牵一发而动全轮
  14. if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  15. }

Timer 接口

  1. trait Timer {
  2. /**
  3. * 将给定的延迟任务插入到时间轮中,等待后续延迟执行
  4. *
  5. * @param timerTask 待添加的延迟任务
  6. */
  7. def add(timerTask: TimerTask): Unit
  8. /**
  9. * 向前推进时间前进,执行任何已过期的延迟任务。
  10. *
  11. * @param timeoutMs 推进到哪个时间点
  12. * @return 如果有延迟任务被执行,返回true,否则返回false
  13. */
  14. def advanceClock(timeoutMs: Long): Boolean
  15. /**
  16. * 获取待执行的延迟任务数量
  17. *
  18. * @return the number of tasks
  19. */
  20. def size: Int
  21. /**
  22. * 关闭定时器
  23. */
  24. def shutdown(): Unit
  25. }

实现类 SystemTimer

  1. /**
  2. * [[Timer]]接口的实现类,这是一个定时器类,底层基于分层时间轮实现延迟任务的处理。
  3. * [[SystemTimer]]也是 [[kafka.server.DelayedFuturePurgatory]]的基础组件,
  4. *
  5. * @param executorName 执行器名称
  6. * @param tickMs 构建时间轮的tickMs时间
  7. * @param wheelSize 时间轮大小
  8. * @param startMs 定时器启动时间
  9. */
  10. @threadsafe
  11. class SystemTimer(executorName: String,
  12. tickMs: Long = 1,
  13. wheelSize: Int = 20,
  14. startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
  15. /**
  16. * Timer执行器,单独的一个线程用于执行延迟任务
  17. */
  18. private[this] val taskExecutor = Executors.newFixedThreadPool(1,
  19. (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
  20. /**
  21. * Java提供的延迟队列,Bucket会放到这个延迟队列中,
  22. * 通过这个队列获得已过期的Bucket对象
  23. */
  24. private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  25. private[this] val taskCounter = new AtomicInteger(0)
  26. /**
  27. * 创建分层时间轮对象
  28. */
  29. private[this] val timingWheel = new TimingWheel(
  30. tickMs = tickMs,
  31. wheelSize = wheelSize,
  32. startMs = startMs,
  33. taskCounter = taskCounter,
  34. delayQueue
  35. )
  36. // ...
  37. }
  1. /**
  2. * 添加延迟任务
  3. * @param timerTask 待添加的延迟任务
  4. */
  5. def add(timerTask: TimerTask): Unit = {
  6. // 获取读锁。在没有线程持有写锁的情况下,多个线程能够同时向时间轮添加定时任务。
  7. // 这里因为TimerTaskList是线程安全的类,所以才可以这么做
  8. readLock.lock()
  9. try {
  10. // 延迟时间是相对时间戳、过期时间是绝对时间戳
  11. addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
  12. } finally {
  13. readLock.unlock()
  14. }
  15. }
  16. private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  17. // #1 将延迟任务插入时间轮
  18. if (!timingWheel.add(timerTaskEntry)) {
  19. // #2 插入失败,可能任务已经过期了或被取消
  20. if (!timerTaskEntry.cancelled) {
  21. // 如果任务没有被取消,直接交给taskExecutor执行器执行
  22. taskExecutor.submit(timerTaskEntry.timerTask)
  23. }
  24. }
  25. }
  1. /**
  2. * 推进时间向前进,如果没有,阻塞timeoutMs,默认值是200ms。
  3. *
  4. */
  5. def advanceClock(timeoutMs: Long): Boolean = {
  6. // #1 从java的延迟队列中获取下一个已过期的Bucket对象
  7. var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
  8. if (bucket != null) {
  9. // #2 存在已过期的Bucket,获取写锁
  10. writeLock.lock()
  11. try {
  12. while (bucket != null) {
  13. // 推动时间轮向前滚动到Bucket的过期时间点
  14. timingWheel.advanceClock(bucket.getExpiration)
  15. // 将该Bucket下的所有定时任务重新写回时间轮
  16. // 这一步会执行已过期的延迟任务
  17. bucket.flush(addTimerTaskEntry)
  18. // 立即读取下一个已过期的Bucket
  19. bucket = delayQueue.poll()
  20. }
  21. } finally {
  22. // 释放写锁
  23. writeLock.unlock()
  24. }
  25. true
  26. } else {
  27. false
  28. }
  29. }

DelayedOperationPurgatory

ApiKeys.PRODUCE

Broker 处理 Producer 的 PRODUCE 请求步骤如下:

  1. 生产者向多个位于某个 Broker 的 Leader 分区追加消息。
  2. 经过 Broekr 网络层,由 KakfaRequestHandler(I/O处理线程) 委托 KafkaApi 对象处理。方法 kafkaApi#handle 对请求进行路由,交由 kafka.server.KafkaApis#handleProduceRequest 处理请求。
  3. 经过校验得到合法的可追加消息的分区列表,将可追加的分区列表委托给 ReplicaManager 处理(调用 ReplicaManager#appendRecords() 方法)。
  4. ReplicaManager 首先判断 ACKS 是否合法,遇到非法的 ACKS 可立即返回错误的 Response。
  5. 将生产者的生产消息持久化到本地日志文件中。
  6. 如果ACKS==0,说明不需要等待其它Follower同步,Producer 也不需要接收 Response。
  7. 如果 ACKS!=0,说明需要等待部分/全部(根据ISR的值确定)位于 ISR 集合的同步,为该分区构建延迟操作对象 DelayedProduce。
  8. DelayedProduce 交给 DelayedOperationPurgatory 处理。
    1. 尝试判断 DelayedProduce 操作是否已经完成。注意,一个 Produce 请求包含对多个 leader 分区的写操作,也就是说,一个 Produce 请求需要等待多个分区的 follower 完成同步后才能返回 Response。DelayedProduce 内部持有每个分区的状态,有一个状态叫 acksPending,如果这个分区满足 ACKS 的要求,就会将 acksPending 设置为 false。因此,DelayedProduce 检查操作是否完成本质是检查对应所有分区状态的 acksPending 是否都为 false。如果有一个分区的 acksPending 为 true,就意味着这个操作还未结束。因此,还不能返回 Response 给 Producer。
    2. 如果 tryComplete() 方法返回 false,说明目前为止操作还未完成。那么对该请求相关的分区都注册一个 watchkey。然后再尝试判断操作是否完成,如果还没有,最后才将操作插入到时间轮中。
  9. Broker 每次抬升某个分区的 HW 时,就会调用 DelayOperations#checkAndCompleteAll() 从而触发 watchkey 的检查,判断所监听的分区是否已经满足条件,如果满足条件则可以进行下一步操作。

总结

  1. DelayQueue 存储具有延迟任务的 Bucket。
  2. 高层时间轮在时间向前推进的过程中也会存在过期的Bucket,从而触发Bucket的降级操作:将 Bucket 内的任务重新插入到低级时间轮中。

https://zhuanlan.zhihu.com/p/121483218