数据结构
netty 框架需要执行许多定时任务,比如心跳、读写超时检测,使用的是自己封装的 HashedWheelTimer。

数据结构特征:一个环形的数据结构,上面分了很多槽,一个槽代表一个时间间隔。每隔固定时间,向前转动一次,执行槽上的定时任务。槽上面的定时任务放置在一个链表里。
数据结构 java 抽象

HashedWheelBucket
hash 轮上面每个 bucket 的抽象, 一个 HashedWheelTimer 对应多个 HashedWheelBucket。HashedWheelBucket 里面的数据结构是一个双向链表,用于存储任务 HashedWheelTimeout;
private static final class HashedWheelBucket {private HashedWheelTimeout head;private HashedWheelTimeout tail;}
HashedWheelTimeout
任务的抽象,是双向链表的节点。里面的 TimerTask 代表具体的待执行的任务。持有 HashedWheelBucket 的 引用;
private static final class HashedWheelTimeout implements Timeout {// 任务的状态private static final int ST_INIT = 0;private static final int ST_CANCELLED = 1;private static final int ST_EXPIRED = 2;// 实现原子更新当前对象的state字段private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");private final HashedWheelTimer timer;private final TimerTask task;private final long deadline;@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })private volatile int state = ST_INIT;// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the// HashedWheelTimeout will be added to the correct HashedWheelBucket.// 定时任务执行位于hash轮 第几圈上long remainingRounds;// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.// As only the workerThread will act on it there is no need for synchronization / volatile.HashedWheelTimeout next;HashedWheelTimeout prev;// The bucket to which the timeout was addedHashedWheelBucket bucket;}
HashedWheelTimer
hash 轮 的数据结构抽象,有一个 work 线程,hash轮 bucket 数组,存储定时任务的队列;
public class HashedWheelTimer implements Timer {private static final int INSTANCE_COUNT_LIMIT = 64;private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);// 实现原子性修改 workerState 字段;private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");private final ResourceLeakTracker<HashedWheelTimer> leak;private final Worker worker = new Worker();private final Thread workerThread;// 工作线程状态public static final int WORKER_STATE_INIT = 0;public static final int WORKER_STATE_STARTED = 1;public static final int WORKER_STATE_SHUTDOWN = 2;@SuppressWarnings({ "unused", "FieldMayBeFinal" })private volatile int workerState; // 0 - init, 1 - started, 2 - shut downprivate final long tickDuration;// hashWheel 数组private final HashedWheelBucket[] wheel;private final int mask;private final CountDownLatch startTimeInitialized = new CountDownLatch(1);// 待执行任务private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();// 待取消任务private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();private final AtomicLong pendingTimeouts = new AtomicLong(0);private final long maxPendingTimeouts;private volatile long startTime;}
主要方法
构造方法
// 部分代码省略public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {// Normalize ticksPerWheel to power of two and initialize the wheel.// 创建 hash 轮,轮数默认是512wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos. 转纳秒long duration = unit.toNanos(tickDuration);// 创建 work 线程workerThread = threadFactory.newThread(worker);}
newTimeout 方法
创建定时任务,并把启动 work 线程。 把任务放到队列
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();// 启动 work 线程start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);// 加入队列timeouts.add(timeout);return timeout;}
start()
public void start() {
// 保证只有一个线程开启 work线程成功
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 原子操作
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 开启work线程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 确保 startTime 完成初始化
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
// 使用countdownlautch 同步
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
work 线程
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
// 线程sleep 到下一个周期
final long deadline = waitForNextTick();
if (deadline > 0) {
// 位运算计算bucket 索引
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
//把待执行的任务放到 对应 bucket 里面
transferTimeoutsToBuckets();
// 执行
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// work 线程退出,逻辑
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
transferTimeoutsToBuckets
把任务从队列放到 hashwheel 对应 bucket 上;
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
// 放到链表上
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
// 放到链表首部
head = tail = timeout;
} else {
// 放到链表尾部
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
expireTimeouts
任务执行
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 遍历链表上的任务,有且只有 remainingRounds,且任务到期才执行
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 执行任务
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 圈数减1
timeout.remainingRounds --;
}
timeout = next;
}
}
参考: 链接
