概述
时间轮的设计思想来源于时钟。
时钟由秒针、分针和时针组成,分别对应第一层时间轮、第二层时间轮和第三层时间轮。当然,你也根据实际需要增加或减少时间轮的层数。针盘上面有刻度,秒针、分针对刻度理解可不一样,对于秒针而言,一个刻度表示 1S,而对于分针而言,一个刻度表示 1 分钟,而对于时针,一个刻度是 0.2 小时。
将其映射至多层时间轮,时间轮都拥有相同的刻度数量,称为 wheelSize(20),默认是 20 个刻度,每个刻度大小称为 tickMs,不同层的 tickMs 是不同的,比如第一层 tickMs = 20ms,第二层为 tickMs = 20*20 = 400ms。每一层也有一个 currentTime 表示当前层的 “此刻”,通过与 currentTime 比较判断是否需要执行任务或对任务进行降级操作。相关示意图如下图所示:
第一层时间轮都会有时间范围:
每一个定时任务拥有两种时间,分别是延时时间(delayTime)和过期时间(ExpiredTime)。比如我们向时间轮添加一个 3S 后执行的任务,当前时间为 2S,那么这个任务所对应的延迟时间和过期时间分别是 3S 和 5S。
添加定时任务
首先,每一个定时任务会有两个时间,一个是延迟时间(delayTime),另一个过期时间(ExpiredTime)。比如当前时间是 10s,我们添加一个延迟时间为 2s 的任务,那么这个任务的过期时间就是 10 + 2 = 12s。
此时,我们需要添加一个延迟时间为 350ms 的任务,当前时间为 0ms。那么它应该被添加到哪一层的哪一个 Bucket 呢? 我们可以通过以下计算步骤得到:
- 定位哪一层时间轮。第一层时间轮范围是
[0, 20ms),不适合,第二层时间轮时间范围是[20, 400ms),包含 350ms 的延迟任务,所以选中第二层时间轮。 - 定位哪个 Bucket。通过以下公式计算可得存放的 ID 位置是第 17 个 bucket。
virtualId = delayTime(350) / tickMs(20) = 17bucketId = virtualId % wheelSize(20) = 17
任务的降级处理
场景:在时刻 0 添加一个 450ms 的延迟任务 A,经过上面的计算会被添加到第三层的第 0 个 Bucket 中。随着时间流逝,已经过去 400ms。那么任务 A 只需等待 50ms 就可以被执行了。显然,50ms 的延迟任务此刻不应该存在第三层时间轮。因此对它进行降级操作。它会重新以 50ms 的延迟任务重新放回到时间轮,经过计算得到位置是第二层的第 0 个 Bucket。
再经过 40ms,又会引起降级操作,它会重新以 10ms 的延迟任务重新放回到时间轮,经过计算得到位置是第一层的第 2 个 Bucket。
此时的时刻为为 440ms,只需要再过 10ms 这个任务就可被执行。Kafka 实现的时间轮
前面只是一个大致的讲解过程,并没有涉及到数据结构的实现。还有许多细节需要通过源码的方式来实现。单层时间轮的实现 TimingWheel
单层时间轮是由kafka.utils.timer.TimingWheel类实现。先看看重要的入参:
| 参数名称 | 说明 |
|---|---|
| tickMs | 时钟刻度 |
| wheelSize | Bucket 数量 |
| startMs | 时间轮对象被创建的起始时间戳 |
| taskCounter | 任务读数器 |
| queue: DelayQueue[TimerTaskList] | 延迟任务队列,里面存放 Bucket |
有 4 个非常重要的内部变量:
@nonthreadsafeprivate[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {/*** 这一层时间轮总时长。* Kafka第一层时间轮:tickMs=1, wheelSize=20,interval=20*1=20ms* Kafka第二层时间轮:tickMs=20,wheelSize=20,interval=20*20=400ms* Kafka第三层时间轮:tickMs=400,wheelSize=20,interval=400*20=8000ms*/private[this] val interval = tickMs * wheelSize/*** 当前层时间轮下的所有Bucket对象,即所有[[TimerTaskList]]对象*/private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }/*** 当前时间戳=当前时间的最大滴答时长的整数倍。* 比如tickMs=20ms,当前时间戳为167ms,则currentTime=160ms*/private[this] var currentTime = startMs - (startMs % tickMs)/*** kafka按需创建上层时间轮。也就是说,如果超出当前层的时间轮的时间范围是不会创建上层时间轮的。* 如果当前层时间轮无法容纳给定延迟时间,就会创建第二层时间轮,如果第二层也满足不了,则创建第三层,* 以此类推,直到找到合适的时间轮层。由于每次步骤都是按倍数递增,所以能很快在有限个时间轮范围内找到合适的时间轮存放* 给定的延迟时间。*/@volatile private[this] var overflowWheel: TimingWheel = null// ...}
| 参数 | 说明 |
|---|---|
| interval | 本层时间轮时间间隔,第一层时间轮20ms,第二层为 400ms,第三层为 8000ms。 当前层的 interval = 上一层 interval * 20 得到 |
| buckets | 一个长度为 20 的数组,存放 TimerTaskList 对象。它是一个双向链表结构,最原始的延迟任务通过对象 TimerTask 包装后添加到链表中,就算完成时间轮入队了 |
| currentTime | 当前时间戳,它需要规格化。即currentTime = 当前时间的最大滴答时长的整数倍。比如 tickMs = 20ms,当前时间戳为 167ms,则 currentTime = 167 - (167 % 20) = 160。这方便通过 & 操作符定位 bucket |
| overflowWheel | Kafka 按需创建下一层(层数+1)时间轮。比如第一层时间轮无法满足,则创建第二层时间轮。每一层时间轮持有下一层时间轮的引用。这样,多层时间轮就可以进行联动了。 |
Bucket 是什么样子
前面也提到过,对象 TimeWheel 的 Bucket 是一个长度为 20 的数组,数组存放 TimerTaskList 类型的实例对象。那我们看看 TimerTaskList 是什么样的结构:
TimerTaskList
这个对象用来表示一个 Bucket,多个 TimerTaskList 构成的一个双向循环列表,就组成 “轮” 状结构了。
/*** 理解为一个Bucket,每个Bucket实际上是一个双向循环链表* Bucket是可以重复使用的:通过重新设置[[TimerTaskList.expiration]]就可以完成** @param taskCounter 每个Bucket所含有的定时任务总数量*/@threadsafeprivate[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {/*** 构成双向循环链表*/private[this] val root = new TimerTaskEntry(null, -1)root.next = rootroot.prev = root/*** Bucket的过期时间戳(绝对时间戳),Bucket对应表盘中的一格,它是一个范围值:[起始时间, 结束时间]* 时间间隔=结束时间-起始时间,同一层的Bucket的时间间隔是一样的。* 如何确定Bucket过期:当前时间越过了Bucket的过期时间,这个Bucket才算是过期的Bucket。* 过期时间就是指expiration字段*/private[this] val expiration = new AtomicLong(-1L)// ...}
| 参数 | 说明 |
|---|---|
| root | 双向链表的头结构 |
| expiration | Bucket 的过期时间戳(绝对时间戳),Bucket 对应表盘中的一格,它也是一个范围值,比如第一层第 0 个 Bucket 的时间范围是:[0, 20ms)。如果确定 Bucket 过期呢 ? 当前时间超过了 Bucket 的过期时间。而过期时间就是指 expiration 字段。 |
内部还提供若干方法:
| 方法名 | 功能 |
|---|---|
| setExpiration(Long) | CAS 修改 expiration 字段值,存在并发 |
| add(TimerTaskEntry) | 向 Bucket 添加一个延迟任务 |
| remove(TimerTaskEntry) | 从链表中移除一延迟任务 |
| flush(f: TimerTaskEntry=>Unit) | 对已过期的 Bucket 上的所有任务执行函数 f。 这里表示时间到了,延迟任务可以被执行 |
TimerTaskList 是一个双向链表结构,一个 TimerTaskList 表示一个 Bucket,里面提供添加和删除 API,还提供一个重要的 flush(function) API 表示延迟任务的执行。
TimerTaskEntry
双向链表存储的对象是 TimerTaskEntry,这个对象主要的目的就是构造双向链表结构。
/*** 构成Bucket的子元素:TimerTaskEntry之间构成双向链表** @param timerTask 延时任务* @param expirationMs 过期时间*/private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {// 绑定的Bucket链表实例@volatile var list: TimerTaskList = null// 前序实体指针var next: TimerTaskEntry = null// 后序实体指针var prev: TimerTaskEntry = null...}
TimerTask
TimerTask 才是真正的延迟任务,但它是一个标识接口(trait),里面定义了几个通用的方法和变量:
| 变量 | 描述 |
|---|---|
| delayMs | 任务延迟时间,通常是指 request.time.ms。可理解为相对时间 |
| TimerTaskEntry | 每一个 TimerTask 会被添加到 Bucket, 因此,它需要绑定一个双向链表适配类,即 TimerTaskEntry,只有这个类才有双向链表的结构 |
相关 API :
| 方法名称 | 说明 |
|---|---|
| cancel | 取消定时任务,处理方式是将对应的 TimerTaskEntry 置为 null |
| setTimerTaskEntry | 关联 TimerTaskEntry |
延迟操作的实现类一般继承 DelayedOperation,但这篇文章不会关注 DelayedOperation 的实现及接口定义,我们可以简单理解:只要实现 TimerTask 接口,就可以被认为是延迟任务,那么任务就可以被添加到时间轮中延迟执行。
如何推进时间轮
那么,Kafka 是如何让时间轮上的指针转动起来的呢 ? 答案在 kafka.utils.timer.Timer 文件中。
Timer 接口
Timer 被 trait 标记,属于接口,相关 API 定义如下:
| 接口名称 | 说明 |
|---|---|
| add(TimerTask) | 将给定的延迟任务添加到时间轮中,当延迟时间结束后执行任务 |
| advanceClock(Long timeoutMs) | 推进时间轮前进。timeoutMs 表示推进到哪个时间点。 |
| size | 获取待执行任务数量 |
| shutdown | 关闭时间轮 |
Timer 实现类
SystemTimer 是 Timer 接口唯一的实现类,首先看看它的入参:
| 入参 | 说明 |
|---|---|
| executorName | 执行器名称 |
| tickMs | 时钟刻度,也就是前进一格所花费的时间。默认值:1ms |
| wheelSize | 单层时间轮 Bucket 数量。默认值:20。因此,单层时间轮所拥有的时间范围 = tickMs * wheelSize |
| startMs | 时间轮启动时间 |
入参核心参数就是 tickMs 和 wheelSize,默认第一层时间轮 tickMs = 1, wheelSize = 20,因此,可以存放延迟时间为 [0, 20ms) 的延迟任务。
下面来说说 SystemTimer 的变量:
| 变量名 | 说明 |
|---|---|
| taskExecutor | 用来执行任务的线程。线程数量默认为 1。 |
| delayQueue[TimerTaskList] | 一个无界的 BlockingQueue,其中的对象只能在到期时才能从队列中取走。即按 deadLine 由小到大进行排序。这个队列存放有任务的 Bucket,当 Bucket 超时后,就可以从队列中取出并执行任务。那为什么不直接使用 BlockingQueue 呢 ? |
| taskCounter | 存放在此时间轮的任务数量 |
| timingWheel | 创建时间轮对象,将 delayQueue 传时间轮对象 |
| readWriteLock | 读写锁 |
| readLock | 读锁 |
| writeLock | 写锁 |
我们需要重点关注 TimingWheel 和 DelayQueue 是如何配置使用的。SystemTimer 也有重要的 API:
| 函数名称 | 说明 |
|---|---|
| add(TimerTask) | 将延迟任务添加到时间轮中。这里是获取读锁,其原因是在没有线程持有写锁的情况下,多个线程能够同时向时间轮添加定时任务,因为 TimerTaskList 是线程安全的类。 如果添加失败,则直接提交给 taskExecutor 执行。因为有可能已过期 |
| advanceClock(Long timeoutMs) | 向前推进时间轮到 timeoutMs。这个 API 需要好好理解, 下面会有详细解释 |
接下来,我们重点看看 advanceClock(Long) 源码:
// kafka.utils.timer.SystemTimer#advanceClock/*** 推进时间向前进,如果没有,阻塞timeoutMs,默认值是200ms。*/def advanceClock(timeoutMs: Long): Boolean = {// #1 从java的延迟队列中获取下一个已过期的Bucket对象var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {// #2 存在已过期的Bucket,则获取写锁,准备修改时间轮writeLock.lock()try {// 尝试从延迟队列中获取已过期的Bucket对象while (bucket != null) {// #3 推动时间轮向前滚动到Bucket的过期时间点timingWheel.advanceClock(bucket.getExpiration)// #4 可能有两种情况:// ① 已到了截止时间,因此执行任务。// ① 还未到执行时间,重新添加回时间轮中bucket.flush(addTimerTaskEntry)// #5 立即读取下一个已过期的Bucketbucket = delayQueue.poll()}} finally {// 释放写锁writeLock.unlock()}true} else {false}}
这一步做了什么操作呢 ? 我举下面的例子:
设:时间单位 u=1,每层 bucket 数量 n = 3,起始时间戳 time = c,总共有 3 层时间轮,具体分布如下:
level buckets1 [c,c] [c+1,c+1] [c+2,c+2]2 [c,c+2] [c+3,c+5] [c+6,c+8]3 [c,c+8] [c+9,c+17] [c+18,c+26]
定义 bucket 过期:如果过期时间=bucket 的起始时间,即范围左值,就表示该 Bucket 过期了。
因此,当时间 time=c+1,buckets [c,c], [c,c+2] and [c,c+8] 这三个 bucket 过期了。 因此 level1 的时针(具体是指 currentTime)移动到 c+1,并且创建时间范围为 [c+3,c+3] 的 Bucket 实例。 level2 和 level3 的还是处于 c,因为走过的时间 1 不足以驱动它们前进一格(level2 需要 3,level3 需要 9)。 因此在 level2 和 level3 不会创建新的 Bucket 对象。
注意:level2的bucket[c,c+2]不会存储任何超时任务,因为这个Bucket和level1整个层重复了。
同样,level3的[c,c+8]也不会存放任何超时任务。
这有一点浪费,但能简化实现。
1 [c+1,c+1] [c+2,c+2] [c+3,c+3]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+26]
在time=c+2(时间又tick了一次),[c+1,c+1] 过期,level1的currentTime更新为c+2,
[c+4,c+4] 被创建:
1 [c+2,c+2] [c+3,c+3] [c+4,c+4]
2 [c,c+2] [c+3,c+5] [c+6,c+8]
3 [c,c+8] [c+9,c+17] [c+18,c+18]
在time=c+3,[c+2,c+2]过期,
level2的currentTime更新为c+3,并且level1创建[c+5,c+5],level2创建[c+9,c+11],
level3还是没有变化:
1 [c+3,c+3] [c+4,c+4] [c+5,c+5]
2 [c+3,c+5] [c+6,c+8] [c+9,c+11]
3 [c,c+8] [c+9,c+17] [c+8,c+11]
当超时任务在超时之前完成时,分层时间轮特别有效率。
即便在所有任务都超时的情况下,分层时间轮依然具有时间优势:插入和删除的时间复杂度分别是O(m)和O(1)。
而基于优先队列则时间复杂度为O(logN)。
/*** 向时间轮添加一个延迟任务* @param timerTaskEntry* @return*/def add(timerTaskEntry: TimerTaskEntry): Boolean = {// #1 获取延迟任务的过期时间val expiration = timerTaskEntry.expirationMs// #2 判断任务是否已被取消if (timerTaskEntry.cancelled) {// Cancelledfalse} else if (expiration < currentTime + tickMs) {// #3 任务已经过期,不允许添加(连第一个Bucket都无法容纳,更别说后面的Bucket了)false} else if (expiration < currentTime + interval) {// #4 任务的超时时间在「currentTime, currentTime+interval)范围内// 说明可以在本层时间轮中找到对应的Bucket// #4-1 确定Bucketval virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)// #4-2 添加延时任务bucket.add(timerTaskEntry)// #4-3 设置Bucket的过期时间。// 通过返回值来确认该Bucket是否是已过期的Bucket。// 如果设置成功,则返回true,说明这个Bucket已经是过期的,需要重新添加到Bucket// 如果设置失败,则返回false,说明这个Bucket已经存在队列中,不需要重复放入队列if (bucket.setExpiration(virtualId * tickMs)) {// 将Bucket入队queue.offer(bucket)}true} else {// 如果过期时间expiration超过本层时间轮,那么递归创建上层时间轮,// 直到确认合适的时间轮为止if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}}
/*** 驱动当前层时间轮向前进** @param timeMs 将本层时间轮向前推进到timeMs,* 这个值必须超过当前层时间轮的Bucket的范围值,否则本次推进操作将变得毫无意义*/def advanceClock(timeMs: Long): Unit = {// #1 推进操作需要timeMs超过Bucket的范围值if (timeMs >= currentTime + tickMs) {// 更新 currentTimecurrentTime = timeMs - (timeMs % tickMs)// #2 如果存在上层时间轮,递归推进所有上层的时间轮// 牵一发而动全轮if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}
Timer 接口
trait Timer {/*** 将给定的延迟任务插入到时间轮中,等待后续延迟执行** @param timerTask 待添加的延迟任务*/def add(timerTask: TimerTask): Unit/*** 向前推进时间前进,执行任何已过期的延迟任务。** @param timeoutMs 推进到哪个时间点* @return 如果有延迟任务被执行,返回true,否则返回false*/def advanceClock(timeoutMs: Long): Boolean/*** 获取待执行的延迟任务数量** @return the number of tasks*/def size: Int/*** 关闭定时器*/def shutdown(): Unit}
实现类 SystemTimer
/*** [[Timer]]接口的实现类,这是一个定时器类,底层基于分层时间轮实现延迟任务的处理。* [[SystemTimer]]也是 [[kafka.server.DelayedFuturePurgatory]]的基础组件,** @param executorName 执行器名称* @param tickMs 构建时间轮的tickMs时间* @param wheelSize 时间轮大小* @param startMs 定时器启动时间*/@threadsafeclass SystemTimer(executorName: String,tickMs: Long = 1,wheelSize: Int = 20,startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {/*** Timer执行器,单独的一个线程用于执行延迟任务*/private[this] val taskExecutor = Executors.newFixedThreadPool(1,(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))/*** Java提供的延迟队列,Bucket会放到这个延迟队列中,* 通过这个队列获得已过期的Bucket对象*/private[this] val delayQueue = new DelayQueue[TimerTaskList]()private[this] val taskCounter = new AtomicInteger(0)/*** 创建分层时间轮对象*/private[this] val timingWheel = new TimingWheel(tickMs = tickMs,wheelSize = wheelSize,startMs = startMs,taskCounter = taskCounter,delayQueue)// ...}
/*** 添加延迟任务* @param timerTask 待添加的延迟任务*/def add(timerTask: TimerTask): Unit = {// 获取读锁。在没有线程持有写锁的情况下,多个线程能够同时向时间轮添加定时任务。// 这里因为TimerTaskList是线程安全的类,所以才可以这么做readLock.lock()try {// 延迟时间是相对时间戳、过期时间是绝对时间戳addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))} finally {readLock.unlock()}}private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// #1 将延迟任务插入时间轮if (!timingWheel.add(timerTaskEntry)) {// #2 插入失败,可能任务已经过期了或被取消if (!timerTaskEntry.cancelled) {// 如果任务没有被取消,直接交给taskExecutor执行器执行taskExecutor.submit(timerTaskEntry.timerTask)}}}
/*** 推进时间向前进,如果没有,阻塞timeoutMs,默认值是200ms。**/def advanceClock(timeoutMs: Long): Boolean = {// #1 从java的延迟队列中获取下一个已过期的Bucket对象var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {// #2 存在已过期的Bucket,获取写锁writeLock.lock()try {while (bucket != null) {// 推动时间轮向前滚动到Bucket的过期时间点timingWheel.advanceClock(bucket.getExpiration)// 将该Bucket下的所有定时任务重新写回时间轮// 这一步会执行已过期的延迟任务bucket.flush(addTimerTaskEntry)// 立即读取下一个已过期的Bucketbucket = delayQueue.poll()}} finally {// 释放写锁writeLock.unlock()}true} else {false}}
DelayedOperationPurgatory
ApiKeys.PRODUCE
Broker 处理 Producer 的 PRODUCE 请求步骤如下:
- 生产者向多个位于某个 Broker 的 Leader 分区追加消息。
- 经过 Broekr 网络层,由 KakfaRequestHandler(I/O处理线程) 委托 KafkaApi 对象处理。方法
kafkaApi#handle对请求进行路由,交由kafka.server.KafkaApis#handleProduceRequest处理请求。 - 经过校验得到合法的可追加消息的分区列表,将可追加的分区列表委托给
ReplicaManager处理(调用ReplicaManager#appendRecords()方法)。 - ReplicaManager 首先判断 ACKS 是否合法,遇到非法的 ACKS 可立即返回错误的 Response。
- 将生产者的生产消息持久化到本地日志文件中。
- 如果ACKS==0,说明不需要等待其它Follower同步,Producer 也不需要接收 Response。
- 如果 ACKS!=0,说明需要等待部分/全部(根据ISR的值确定)位于 ISR 集合的同步,为该分区构建延迟操作对象 DelayedProduce。
- 将
DelayedProduce交给DelayedOperationPurgatory处理。- 尝试判断
DelayedProduce操作是否已经完成。注意,一个 Produce 请求包含对多个 leader 分区的写操作,也就是说,一个 Produce 请求需要等待多个分区的 follower 完成同步后才能返回 Response。DelayedProduce内部持有每个分区的状态,有一个状态叫acksPending,如果这个分区满足 ACKS 的要求,就会将acksPending设置为 false。因此,DelayedProduce检查操作是否完成本质是检查对应所有分区状态的acksPending是否都为 false。如果有一个分区的acksPending为 true,就意味着这个操作还未结束。因此,还不能返回 Response 给 Producer。 - 如果 tryComplete() 方法返回 false,说明目前为止操作还未完成。那么对该请求相关的分区都注册一个 watchkey。然后再尝试判断操作是否完成,如果还没有,最后才将操作插入到时间轮中。
- 尝试判断
- Broker 每次抬升某个分区的 HW 时,就会调用
DelayOperations#checkAndCompleteAll()从而触发 watchkey 的检查,判断所监听的分区是否已经满足条件,如果满足条件则可以进行下一步操作。
总结
- DelayQueue 存储具有延迟任务的 Bucket。
- 高层时间轮在时间向前推进的过程中也会存在过期的Bucket,从而触发Bucket的降级操作:将 Bucket 内的任务重新插入到低级时间轮中。
