数据结构

netty 框架需要执行许多定时任务,比如心跳、读写超时检测,使用的是自己封装的 HashedWheelTimer。

image.png
数据结构特征:一个环形的数据结构,上面分了很多槽,一个槽代表一个时间间隔。每隔固定时间,向前转动一次,执行槽上的定时任务。槽上面的定时任务放置在一个链表里。

数据结构 java 抽象

image.png

HashedWheelBucket

hash 轮上面每个 bucket 的抽象, 一个 HashedWheelTimer 对应多个 HashedWheelBucket。HashedWheelBucket 里面的数据结构是一个双向链表,用于存储任务 HashedWheelTimeout;

  1. private static final class HashedWheelBucket {
  2. private HashedWheelTimeout head;
  3. private HashedWheelTimeout tail;
  4. }

HashedWheelTimeout

任务的抽象,是双向链表的节点。里面的 TimerTask 代表具体的待执行的任务。持有 HashedWheelBucket 的 引用;

  1. private static final class HashedWheelTimeout implements Timeout {
  2. // 任务的状态
  3. private static final int ST_INIT = 0;
  4. private static final int ST_CANCELLED = 1;
  5. private static final int ST_EXPIRED = 2;
  6. // 实现原子更新当前对象的state字段
  7. private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
  8. AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
  9. private final HashedWheelTimer timer;
  10. private final TimerTask task;
  11. private final long deadline;
  12. @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
  13. private volatile int state = ST_INIT;
  14. // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
  15. // HashedWheelTimeout will be added to the correct HashedWheelBucket.
  16. // 定时任务执行位于hash轮 第几圈上
  17. long remainingRounds;
  18. // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
  19. // As only the workerThread will act on it there is no need for synchronization / volatile.
  20. HashedWheelTimeout next;
  21. HashedWheelTimeout prev;
  22. // The bucket to which the timeout was added
  23. HashedWheelBucket bucket;
  24. }

HashedWheelTimer

hash 轮 的数据结构抽象,有一个 work 线程,hash轮 bucket 数组,存储定时任务的队列;

  1. public class HashedWheelTimer implements Timer {
  2. private static final int INSTANCE_COUNT_LIMIT = 64;
  3. private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
  4. // 实现原子性修改 workerState 字段;
  5. private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
  6. AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
  7. private final ResourceLeakTracker<HashedWheelTimer> leak;
  8. private final Worker worker = new Worker();
  9. private final Thread workerThread;
  10. // 工作线程状态
  11. public static final int WORKER_STATE_INIT = 0;
  12. public static final int WORKER_STATE_STARTED = 1;
  13. public static final int WORKER_STATE_SHUTDOWN = 2;
  14. @SuppressWarnings({ "unused", "FieldMayBeFinal" })
  15. private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
  16. private final long tickDuration;
  17. // hashWheel 数组
  18. private final HashedWheelBucket[] wheel;
  19. private final int mask;
  20. private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  21. // 待执行任务
  22. private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
  23. // 待取消任务
  24. private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
  25. private final AtomicLong pendingTimeouts = new AtomicLong(0);
  26. private final long maxPendingTimeouts;
  27. private volatile long startTime;
  28. }

主要方法

构造方法

  1. // 部分代码省略
  2. public HashedWheelTimer(
  3. ThreadFactory threadFactory,
  4. long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
  5. long maxPendingTimeouts) {
  6. // Normalize ticksPerWheel to power of two and initialize the wheel.
  7. // 创建 hash 轮,轮数默认是512
  8. wheel = createWheel(ticksPerWheel);
  9. mask = wheel.length - 1;
  10. // Convert tickDuration to nanos. 转纳秒
  11. long duration = unit.toNanos(tickDuration);
  12. // 创建 work 线程
  13. workerThread = threadFactory.newThread(worker);
  14. }

newTimeout 方法

创建定时任务,并把启动 work 线程。 把任务放到队列

  1. public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
  2. long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
  3. // 启动 work 线程
  4. start();
  5. // Add the timeout to the timeout queue which will be processed on the next tick.
  6. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
  7. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  8. // Guard against overflow.
  9. if (delay > 0 && deadline < 0) {
  10. deadline = Long.MAX_VALUE;
  11. }
  12. HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  13. // 加入队列
  14. timeouts.add(timeout);
  15. return timeout;
  16. }

start()

    public void start() {
        // 保证只有一个线程开启 work线程成功
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                // 原子操作
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                   // 开启work线程
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // 确保 startTime 完成初始化
        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                // 使用countdownlautch 同步
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

work 线程


    private final class Worker implements Runnable {
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        private long tick;

        @Override
        public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                // 线程sleep 到下一个周期
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    // 位运算计算bucket 索引
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    //把待执行的任务放到 对应 bucket 里面 
                    transferTimeoutsToBuckets();
                    // 执行
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // work 线程退出,逻辑
            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

    }

transferTimeoutsToBuckets

把任务从队列放到 hashwheel 对应 bucket 上;

   private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // adds new timeouts in a loop.
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }

                long calculated = timeout.deadline / tickDuration;
                timeout.remainingRounds = (calculated - tick) / wheel.length;

                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

        // 放到链表上
        public void addTimeout(HashedWheelTimeout timeout) {
            assert timeout.bucket == null;
            timeout.bucket = this;
            if (head == null) {
                // 放到链表首部
                head = tail = timeout;
            } else {
                // 放到链表尾部
                tail.next = timeout;
                timeout.prev = tail;
                tail = timeout;
            }
        }

expireTimeouts

任务执行

        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // 遍历链表上的任务,有且只有 remainingRounds,且任务到期才执行
            // process all timeouts
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 执行任务
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    // 圈数减1
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

参考: 链接