数据结构
netty 框架需要执行许多定时任务,比如心跳、读写超时检测,使用的是自己封装的 HashedWheelTimer。
数据结构特征:一个环形的数据结构,上面分了很多槽,一个槽代表一个时间间隔。每隔固定时间,向前转动一次,执行槽上的定时任务。槽上面的定时任务放置在一个链表里。
数据结构 java 抽象
HashedWheelBucket
hash 轮上面每个 bucket 的抽象, 一个 HashedWheelTimer 对应多个 HashedWheelBucket。HashedWheelBucket 里面的数据结构是一个双向链表,用于存储任务 HashedWheelTimeout;
private static final class HashedWheelBucket {
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
}
HashedWheelTimeout
任务的抽象,是双向链表的节点。里面的 TimerTask 代表具体的待执行的任务。持有 HashedWheelBucket 的 引用;
private static final class HashedWheelTimeout implements Timeout {
// 任务的状态
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
// 实现原子更新当前对象的state字段
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
// 定时任务执行位于hash轮 第几圈上
long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;
// The bucket to which the timeout was added
HashedWheelBucket bucket;
}
HashedWheelTimer
hash 轮 的数据结构抽象,有一个 work 线程,hash轮 bucket 数组,存储定时任务的队列;
public class HashedWheelTimer implements Timer {
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
// 实现原子性修改 workerState 字段;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker<HashedWheelTimer> leak;
private final Worker worker = new Worker();
private final Thread workerThread;
// 工作线程状态
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
private final long tickDuration;
// hashWheel 数组
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
// 待执行任务
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 待取消任务
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
private volatile long startTime;
}
主要方法
构造方法
// 部分代码省略
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
// Normalize ticksPerWheel to power of two and initialize the wheel.
// 创建 hash 轮,轮数默认是512
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos. 转纳秒
long duration = unit.toNanos(tickDuration);
// 创建 work 线程
workerThread = threadFactory.newThread(worker);
}
newTimeout 方法
创建定时任务,并把启动 work 线程。 把任务放到队列
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 启动 work 线程
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 加入队列
timeouts.add(timeout);
return timeout;
}
start()
public void start() {
// 保证只有一个线程开启 work线程成功
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 原子操作
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 开启work线程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 确保 startTime 完成初始化
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
// 使用countdownlautch 同步
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
work 线程
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
// 线程sleep 到下一个周期
final long deadline = waitForNextTick();
if (deadline > 0) {
// 位运算计算bucket 索引
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
//把待执行的任务放到 对应 bucket 里面
transferTimeoutsToBuckets();
// 执行
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// work 线程退出,逻辑
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
transferTimeoutsToBuckets
把任务从队列放到 hashwheel 对应 bucket 上;
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
// 放到链表上
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
// 放到链表首部
head = tail = timeout;
} else {
// 放到链表尾部
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
expireTimeouts
任务执行
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 遍历链表上的任务,有且只有 remainingRounds,且任务到期才执行
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 执行任务
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 圈数减1
timeout.remainingRounds --;
}
timeout = next;
}
}
参考: 链接