NioEventLoop是netty中用来执行任务、从channel中读取数据的执行者,在创建NioEventLoopGroup时,会创建多个NioEventLoop来绑定多个channel。NioEventLoop也是netty中的核心组件,理解了NioEventLoop的设计之后,理解NioEventLoopGroup就非常好理解了。

EventLoop

EventLoop接口继承了ExecutorService接口,表明这也是个线程池。当一个Channel注册完成之后,EventLoop就会处理这个channel的所有I/O操作。通常来说,一个EventLoop会处理多个Channel,但也取决于实现的细节和内部。EventLoop接口还继承了EventLoopGroup和OrderedEventExecutor两个接口,OrderedEventExecutor接口表明这个类会有序地执行被提交的任务,对于EventLoopGroup的特性则会在以后进行介绍。

NioEventLoop

NioEventLoop是SingleThreadEventLoop类的子类,实现了EventLoop。NioEventLoop将channel注册到多路复用器(selector)上并处理事件循环。
NioEventLoop中定义了一个常量SELECTOR_AUTO_REBUILD_THRESHOLD使用来解决Linux下epoll bug的,这个值在静态代码块中赋值,可以通过系统参数io.netty.selectorAutoRebuildThreshold来进行设置,默认为512:

  1. private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
  2. private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
  3. // Workaround for JDK NIO bug.
  4. //
  5. // See:
  6. // - http://bugs.sun.com/view_bug.do?bug_id=6427854
  7. // - https://github.com/netty/netty/issues/203
  8. static {
  9. //省略部分代码
  10. // 设置SELECTOR_AUTO_REBUILD_THRESHOLD,当事件循环中select次数达到SELECTOR_AUTO_REBUILD_THRESHOLD这个值时,会进行rebuildSelector
  11. int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
  12. if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
  13. selectorAutoRebuildThreshold = 0;
  14. }
  15. SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
  16. if (logger.isDebugEnabled()) {
  17. logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
  18. logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
  19. }
  20. }

NioEventLoop中还有有以下的非静态成员变量:

  1. /**
  2. * The NIO {@link Selector}.
  3. */
  4. private Selector selector;
  5. private Selector unwrappedSelector;
  6. private SelectedSelectionKeySet selectedKeys;
  7. private final SelectorProvider provider;
  8. // nextWakeupNanos is:
  9. // AWAKE when EL is awake
  10. // NONE when EL is waiting with no wakeup scheduled
  11. // other value T when EL is waiting with wakeup scheduled at time T
  12. private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
  13. private final SelectStrategy selectStrategy;
  14. private volatile int ioRatio = 50;
  15. private int cancelledKeys;
  16. private boolean needsToSelectAgain;

其中

  • selector是被包装后的多路复用器
  • unwrappedSelector是未被包装的多路复用器
  • selectedKeys是selector收集到的事件的集合,内部包含了一个SelectionKey数组
  • provider是用来获取Selector的提供者
  • nextWakeupNanos AWAKE表示这个EventLoop是活跃的,None表时当前EventLoop在等待并且没有计划中的唤醒任务,其他值T表时当前EventLoop正在等待并且会在时间T的时候被计划唤醒,这个T是一个纳秒
  • selectStrategy 是select策略的计算器,其中的calculateStrategy方法用来计算当前的策略,默认的实现类DefaultSelectStrategy的calculateStrategy方法中是先判断任务队列中是否有任务,如果没有任务就返回SelectStrategy.Select(-2),表示需要进行阻塞地去执行selector的select方法,否则就直接执行selector的selectNow方法,获取当前有多少个事件作为结果,如果calculateStrategy方法的返回值大于零,那么就是当前有多少个事件,如果小于零,-1表示重新进行一次操作,-2表示需要进行一次阻塞的select操作,-3表示需要进行一次非阻塞的select操作,NIO模型下-3是不支持的
  • ioRatio是事件循环中需要花费在I/O上的时间的比率,值在0-100之间,值越大说明任务的I/O越密集,值越小说明计算越密集
  • cancelledKeys被移除的SelectionKey的数量
  • needsToSelectAgain是否需要再次select

NioEventLoop只有一个protected修饰的构造方法:

  1. /**
  2. * 代码片段2 NioEventLoop中的唯一构造方法
  3. */
  4. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
  5. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
  6. EventLoopTaskQueueFactory queueFactory) {
  7. super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
  8. rejectedExecutionHandler);
  9. this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
  10. this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
  11. final SelectorTuple selectorTuple = openSelector();
  12. this.selector = selectorTuple.selector;
  13. this.unwrappedSelector = selectorTuple.unwrappedSelector;
  14. }

在构造函数中,会给provider,selectStrategy,selector,unwrappedSelector进行赋值。其中provider和selectStrategy是构造参数中的,selector和unwrappedSelector是从构造的SelectorTuple对象中获取。selectorTuple对象通过openSelecter方法创建,openSelcetor方法会调用SelectorProvider的openSelector方法创建一个Selector,在NioEventLoop的openSelector方法中还会通过反射将创建的selector对象中的selectedKeys属性和publicSelectedKeys属性使用新创建的SelectedSelectionKeySet对象进行赋值,同时也将这个对象赋值给当前NioEventLoop的selectedKeys属性。

  1. private SelectorTuple openSelector() {
  2. final Selector unwrappedSelector;
  3. try {
  4. unwrappedSelector = provider.openSelector();
  5. } catch (IOException e) {
  6. throw new ChannelException("failed to open a new selector", e);
  7. }
  8. // 省略部分代码
  9. Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
  10. @Override
  11. public Object run() {
  12. try {
  13. return Class.forName(
  14. "sun.nio.ch.SelectorImpl",
  15. false,
  16. PlatformDependent.getSystemClassLoader());
  17. } catch (Throwable cause) {
  18. return cause;
  19. }
  20. }
  21. });
  22. // 省略部分代码
  23. final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
  24. //新建一个SelectedSelectionKeySet对象
  25. final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
  26. Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
  27. @Override
  28. public Object run() {
  29. try {
  30. //获取selectedKeys和publicSelectedKeys属性
  31. Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  32. Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  33. //省略部分代码
  34. Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
  35. if (cause != null) {
  36. return cause;
  37. }
  38. cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
  39. if (cause != null) {
  40. return cause;
  41. }
  42. //通过反射设置selector的selectedKeys和publicSelectedKeys属性为刚创建的SelectedSelectionKeySet对象
  43. selectedKeysField.set(unwrappedSelector, selectedKeySet);
  44. .set(unwrappedSelector, selectedKeySet);
  45. return null;
  46. } catch (NoSuchFieldException e) {
  47. return e;
  48. } catch (IllegalAccessException e) {
  49. return e;
  50. }
  51. }
  52. });
  53. if (maybeException instanceof Exception) {
  54. selectedKeys = null;
  55. Exception e = (Exception) maybeException;
  56. logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
  57. return new SelectorTuple(unwrappedSelector);
  58. }
  59. //设置NioEventLoop的selectedKeys属性
  60. selectedKeys = selectedKeySet;
  61. logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
  62. //使用未包装的selector构建一个包装过的selector,再用这两个对象一起构建一个SelectorTuple对象
  63. return new SelectorTuple(unwrappedSelector,
  64. new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
  65. }

NioEventLoop中最重要的方法是run方法,由于方法内容很多,因此就直接在代码中进行注释,部分非关键代码也会进行省略:

  1. @Override
  2. protected void run() {
  3. int selectCnt = 0;
  4. //这是一个死循环 也就是会一直去获取事件
  5. for (;;) {
  6. try {
  7. int strategy;
  8. try {
  9. //执行策略计算器的计算策略的方法,如果hasTask方法为true,就是说队列里面有任务,那么selector会立刻执行一次select方法,得到的int结果即为strategy
  10. strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
  11. switch (strategy) {
  12. //CONTINUE和BUSY_WAIT都不会被走到
  13. case SelectStrategy.CONTINUE:
  14. continue;
  15. case SelectStrategy.BUSY_WAIT:
  16. // fall-through to SELECT since the busy-wait is not supported with NIO
  17. case SelectStrategy.SELECT:
  18. //走到SELECT的话,说明上面执行的hasTasks为false
  19. //计算这一次执行任务的时间在多少纳秒后
  20. long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
  21. if (curDeadlineNanos == -1L) {
  22. curDeadlineNanos = NONE; // nothing on the calendar
  23. }
  24. //设置当前NioEventLoop下一次select操作在curDeadlineNanos纳秒,这个纳秒数是基于io.netty.util.concurrent.ScheduledFutureTask类第一次被调用时其中的静态变量START_TIME被赋的值(当时的时间System.nanosTime())
  25. nextWakeupNanos.set(curDeadlineNanos);
  26. try {
  27. if (!hasTasks()) {
  28. //如果curDeadlineNanos是NONE或者当前时间距离curDeadlineNanos不到5微秒,selector会立刻执行select操作,否则会阻塞到deadline(curDeadlineNanos - 5微秒,然后在按毫秒取整,也就是如果是6微秒,那么会阻塞到1毫秒之后)这个时间点再去select,得到的结果就是strategy
  29. strategy = select(curDeadlineNanos);
  30. }
  31. } finally {
  32. // This update is just to help block unnecessary selector wakeups
  33. // so use of lazySet is ok (no race condition)
  34. //这个时候select操作已经执行了,将当前NioEventLoop设置为活跃的(从select操作的阻塞中wakeup)
  35. nextWakeupNanos.lazySet(AWAKE);
  36. }
  37. // fall through
  38. default:
  39. // 如果hasTasks为true,则不会执行上面的逻辑
  40. } catch (IOException e) {
  41. //省略异常处理
  42. }
  43. //上面的try代码块中strategy的结果就是执行一次select操作的结果,也就是事件的数量
  44. //select的次数加一(不管hasTasks是否哦为true都会执行一次select操作,如果是CONTINUE的话,则不会走到这里而是直接下一次循环,并且CONTINUE这个变量除了这个方法的case中也没有其他地方用到过),重置cancelledKeys和needsToSelectAgain
  45. // selectCnt加一 如果连续512次加一之后没有被重置为0 ,那么netty会认为进入空转了
  46. selectCnt++;
  47. cancelledKeys = 0;
  48. needsToSelectAgain = false;
  49. final int ioRatio = this.ioRatio;
  50. boolean ranTasks;
  51. if (ioRatio == 100) {
  52. //如果IO比例为100就先执行processSelectedKeys,最后执行runAllTasks。processSelectedKeys方法会获取selectedKeys中的每个SelectionKey对象的attachment属性,这个属性如果是一个channel,就从channel中执行对应事件的逻辑(read,write,connect等),如果selecttedKeys为空会直接return。
  53. try {
  54. if (strategy > 0) {
  55. processSelectedKeys();
  56. }
  57. } finally {
  58. // Ensure we always run tasks.
  59. //依次从scheduledTaskQueue中取出task并执行,最后依次从tailTask中取出task并执行
  60. ranTasks = runAllTasks();
  61. }
  62. } else if (strategy > 0) {
  63. //如果IO比例小于100 且 strategy>0
  64. final long ioStartTime = System.nanoTime();
  65. try {
  66. processSelectedKeys();
  67. } finally {
  68. // Ensure we always run tasks.
  69. //根据io比例和io所用的时间计算出任务所需的时间
  70. final long ioTime = System.nanoTime() - ioStartTime;
  71. //从scheduledTaskQueue中依次取出task放入taskQueue中,如果放入失败,就将这个task再次放入scheduledTaskQueue中并结束循环。然后依次从taskQueue中取出task,每执行64(因为执行nanosTime()方法相对昂贵)个task后会判断是否已经超出执行任务的时间,如果超出了会结束循环,最后依次将tailTasks中的task执行完
  72. ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  73. }
  74. } else {
  75. //参数为0的话就是最多执行64个task,也就是最小的task数量就返回
  76. ranTasks = runAllTasks(0); // This will run the minimum number of tasks
  77. }
  78. if (ranTasks || strategy > 0) {
  79. // 如果一次select操作后有任务被执行或者selcet的结果大于0,那么selectCnt会被重置
  80. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
  81. logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
  82. selectCnt - 1, selector);
  83. }
  84. selectCnt = 0;
  85. }
  86. // unexpectedSelectorWakeup会根据select的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,如果大于SELECTOR_AUTO_REBUILD_THRESHOLD则会进行rebuildSelector,rebuildSelector 方法会重新构造一个Seletor并将现有的channel迁移到新的Selector上
  87. // 这是为了处理linux下的epoll bug,nio中是通过selector轮询io事件,selector的select方法会一直阻塞或者超时,linux有时候会出现问题,有时候即使没有io事件到达或超时selector也会返回,会导致线程进入死循环从而cup load 100%,netty定义了一个selector空轮询的次数阈值SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,当selectCnt超过这个值会rebuildSelector
  88. else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
  89. selectCnt = 0;
  90. }
  91. } catch (CancelledKeyException e) {
  92. //省略部分代码
  93. } catch (Throwable t) {
  94. handleLoopException(t);
  95. }
  96. //省略部分代码
  97. }
  98. }

SingleThreadEventLoop

SingleThreadEventLoop是一个抽象类,继承了SingleThreadEventExecutor类,同时是NioEventLoop的父类,它也实现了实现了EventLoop接口。顾名思义,SingleThreadEventLoop使用单线程处理被提交的任务。

SingThreadEventLoop中有一个成员属性tailTasks:

  1. private final Queue<Runnable> tailTasks;

tailTasks是一个存储了Runnable类型对象的一个队列,从名称也可以看出来tailTasks是尾部的任务,tailTasks中的task会在当前或者下一次时间循环中最后别执行(从上文的代码分析中也可以看出)。

提供了一个public修饰的方法executeAfterEventLoopIteration,这个方法是用来添加一个task在当前或者下一次事件循环的最后执行的,因为这个方法的权限修饰符是public,所以在使用netty的时候就可以调用这个方法来添加当前或者下一次事件循环之后执行的任务。而一般情况下tailTasks是不会被使用到的。

  1. /**
  2. * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
  3. *
  4. * @param task to be added.
  5. */
  6. @UnstableApi
  7. public final void executeAfterEventLoopIteration(Runnable task) {
  8. ObjectUtil.checkNotNull(task, "task");
  9. if (isShutdown()) {
  10. reject();
  11. }
  12. if (!tailTasks.offer(task)) {
  13. reject(task);
  14. }
  15. if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
  16. wakeup(inEventLoop());
  17. }
  18. }

同样还提供了删除task的方法:

  1. /**
  2. * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
  3. *
  4. * @param task to be removed.
  5. *
  6. * @return {@code true} if the task was removed as a result of this call.
  7. */
  8. @UnstableApi
  9. final boolean removeAfterEventLoopIterationTask(Runnable task) {
  10. return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
  11. }

SingleThreadEventLoop还重写了父类SingleThreadEventExecutor的hasTasks方法,如果父类中的taskQueue不为空或自身的tailTasks不为空则会返回true:

  1. @Override
  2. protected boolean hasTasks() {
  3. return super.hasTasks() || !tailTasks.isEmpty();
  4. }

SingleThreadEventExecutor

SingleThreadEventExecutor是SingleThreadEventLoop的父类,这是一个抽象类,继承自AbstractScheduledEventExecutor,实现了OrderedEventExecutor接口(SingleThreadEventExecutor没有实现EventLoop接口,所以SingleThreadEventLoop相比SingleThreadEventExecutor而言其特点就是实现了EventLoop接口,也就是说SingleThreadEventLoop处理事件是循环去处理的,不过EventLoop接口并没有定义循环处理的方法,而只是语义上规定进行循环处理事件。因为SingleThreadEventLoop是循环的,所以SingleThreadEventLoop也就可以定于在循环的结尾执行tailTasks中的任务)。

SingleThreadEventExecutor中含有以下一些重要的成员变量:

  1. // 任务队列
  2. private final Queue<Runnable> taskQueue;
  3. // 执行任务的线程,会使用下面的executor来启动,并将thread设置为executor中执行的线程
  4. private volatile Thread thread;
  5. //线程的一些属性
  6. @SuppressWarnings("unused")
  7. private volatile ThreadProperties threadProperties;
  8. //执行器,用来启动
  9. private final Executor executor;
  10. //上面的thread是否被interrupte了
  11. private volatile boolean interrupted;
  12. //最大的任务数量
  13. private final int maxPendingTasks;
  14. //达到最大任务数量之后的拒绝策略
  15. private final RejectedExecutionHandler rejectedExecutionHandler;
  16. //上一次执行的时间
  17. private long lastExecutionTime;
  18. //这个SingleThreadEventExecutor执行器的状态,刚初始化的时候是未启动
  19. @SuppressWarnings({ "FieldMayBeFinal", "unused" })
  20. private volatile int state = ST_NOT_STARTED;

SingleThreadEventExecutor执行任务的方法是execute方法和lazyExecute方法,这两个方法最终都调用了另一个私有的重载的execute方法:

  1. @Override
  2. public void execute(Runnable task) {
  3. ObjectUtil.checkNotNull(task, "task");
  4. execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
  5. }
  6. @Override
  7. public void lazyExecute(Runnable task) {
  8. execute(ObjectUtil.checkNotNull(task, "task"), false);
  9. }
  10. private void execute(Runnable task, boolean immediate) {
  11. //判断当前线程是否是执行任务的线程(成员变量thread)
  12. boolean inEventLoop = inEventLoop();
  13. //将task加入任务队列中
  14. addTask(task);
  15. if (!inEventLoop) {
  16. // 如果当前线程不是执行任务的线程,就执行startThread方法,做这个判断是因为在执行任务的线程中也会使用execute提交任务
  17. startThread();
  18. if (isShutdown()) {
  19. boolean reject = false;
  20. try {
  21. if (removeTask(task)) {
  22. reject = true;
  23. }
  24. } catch (UnsupportedOperationException e) {
  25. // The task queue does not support removal so the best thing we can do is to just move on and
  26. // hope we will be able to pick-up the task before its completely terminated.
  27. // In worst case we will log on termination.
  28. }
  29. if (reject) {
  30. reject();
  31. }
  32. }
  33. }
  34. if (!addTaskWakesUp && immediate) {
  35. wakeup(inEventLoop);
  36. }
  37. }

在execute(Runnable task, boolean immediate)这个方法中会调用startThread方法,下面看下startThread方法:

  1. private void startThread() {
  2. if (state == ST_NOT_STARTED) {
  3. //如果线程状态没有启动就会将状态设为已经启动,并执行doStartThread方法
  4. //如果线程已经启动了,就什么都不会做,上面的execute(Runnable task, boolean immediate)方法也仅仅只是将task放入任务队列中去
  5. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
  6. boolean success = false;
  7. try {
  8. doStartThread();
  9. success = true;
  10. } finally {
  11. if (!success) {
  12. //没有成功的话就将状态设为未启动
  13. STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
  14. }
  15. }
  16. }
  17. }
  18. }

线程状态没有启动的话,就调用doStartThread方法,通常来说doStratThread方法只会被调用一次:

  1. private void doStartThread() {
  2. assert thread == null;
  3. // 通过executor这个线程池来执行一个Runnable对象,这个线程池中执行这个Runnable的线程就是工作线程
  4. executor.execute(new Runnable() {
  5. @Override
  6. public void run() {
  7. //将成员变量thread设置为当前线程
  8. thread = Thread.currentThread();
  9. if (interrupted) {
  10. thread.interrupt();
  11. }
  12. boolean success = false;
  13. updateLastExecutionTime();
  14. try {
  15. //这里最终会调用子类中的run方法,在NioEventLoop中就是循环select获取事件,并执行任务,可参考上文介绍的NioEventLoop中run方法
  16. SingleThreadEventExecutor.this.run();
  17. success = true;
  18. } catch (Throwable t) {
  19. logger.warn("Unexpected exception from an event executor: ", t);
  20. } finally {
  21. //省略部分代码
  22. }
  23. }
  24. });
  25. }

根据以上的代码可以发现,当使用NioEventloop执行任务时,会将任务先放在队列中,如果是第一次启动,会先启动线程,在这个线程中会不断地循环获取事件封装任务放入队列中,并从中任务队列中获取任务执行。这就是NioEventLoop的执行的一系列流程。

AbstractScheduledEventExecutor

AbstractScheduledEventExecutor是一个抽象类,是SingleThreadEventExecutor的父类,是一个支持定时任务的执行器。AbstractScheduledEventExecutor中维护了一个优先队列scheduledTaskQueue用来存放定时任务,以及一个long类型的nextTaskId表示下一个定时任务的id:

  1. PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
  2. long nextTaskId;

scheduledTaskQueue中存放的类型为ScheduledFutureTask

上面NioEventLoop的run方法中中调用nextScheduledTaskDeadlineNanos方法来获取下一个定时任务的时间作为select操作的超时时间,其定义如下:

  1. /**
  2. * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
  3. * if no task is scheduled.
  4. */
  5. protected final long nextScheduledTaskDeadlineNanos() {
  6. //获取下一次定时任务
  7. ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  8. //返回下一次定时任务的deadlineNanos
  9. return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
  10. }

AbstractScheduledEventExecutor主要是实现了父类中定义的一些schedule方法:

  1. @Override
  2. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  3. ObjectUtil.checkNotNull(command, "command");
  4. ObjectUtil.checkNotNull(unit, "unit");
  5. if (delay < 0) {
  6. delay = 0;
  7. }
  8. validateScheduled0(delay, unit);
  9. return schedule(new ScheduledFutureTask<Void>(
  10. this,
  11. command,
  12. deadlineNanos(unit.toNanos(delay))));
  13. }
  14. @Override
  15. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  16. ObjectUtil.checkNotNull(callable, "callable");
  17. ObjectUtil.checkNotNull(unit, "unit");
  18. if (delay < 0) {
  19. delay = 0;
  20. }
  21. validateScheduled0(delay, unit);
  22. return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
  23. }
  24. @Override
  25. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
  26. ObjectUtil.checkNotNull(command, "command");
  27. ObjectUtil.checkNotNull(unit, "unit");
  28. if (initialDelay < 0) {
  29. throw new IllegalArgumentException(
  30. String.format("initialDelay: %d (expected: >= 0)", initialDelay));
  31. }
  32. if (period <= 0) {
  33. throw new IllegalArgumentException(
  34. String.format("period: %d (expected: > 0)", period));
  35. }
  36. validateScheduled0(initialDelay, unit);
  37. validateScheduled0(period, unit);
  38. return schedule(new ScheduledFutureTask<Void>(
  39. this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
  40. }
  41. @Override
  42. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  43. ObjectUtil.checkNotNull(command, "command");
  44. ObjectUtil.checkNotNull(unit, "unit");
  45. if (initialDelay < 0) {
  46. throw new IllegalArgumentException(
  47. String.format("initialDelay: %d (expected: >= 0)", initialDelay));
  48. }
  49. if (delay <= 0) {
  50. throw new IllegalArgumentException(
  51. String.format("delay: %d (expected: > 0)", delay));
  52. }
  53. validateScheduled0(initialDelay, unit);
  54. validateScheduled0(delay, unit);
  55. return schedule(new ScheduledFutureTask<Void>(
  56. this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
  57. }

这些方法都最终构造一个ScheduledFutureTask对象,并且调用private修饰的schedule(final ScheduledFutureTask task)方法:

  1. private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
  2. if (inEventLoop()) {
  3. scheduleFromEventLoop(task);
  4. } else {
  5. final long deadlineNanos = task.deadlineNanos();
  6. // task will add itself to scheduled task queue when run if not expired
  7. if (beforeScheduledTaskSubmitted(deadlineNanos)) {
  8. execute(task);
  9. } else {
  10. lazyExecute(task);
  11. // Second hook after scheduling to facilitate race-avoidance
  12. if (afterScheduledTaskSubmitted(deadlineNanos)) {
  13. execute(WAKEUP_TASK);
  14. }
  15. }
  16. }
  17. return task;
  18. }

在schedule(final ScheduledFutureTask task)方法中会将这个ScheduledFutureTask对象执行execute方法,execute方法依赖于子类的实现,可以参照上文中SingleThreadEventExecutor中的实现。ScheduledFutureTask类实现了Runable接口,并且其中有以下两个成员变量:

  1. private long deadlineNanos;
  2. /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
  3. private final long periodNanos;
  • deadlineNanos,task的截止时间(纳秒)
  • periodNanos,任务循环的时间段(纳秒),如果等于0则不循环,如果大于0则按照固定的频率循环,如果小于0则按照固定的延迟循环(固定频率下一次执行的时间是执行前的时间加上时间段的时长,固定延迟是执行完之后的时间加上时间段的时长)
    ScheduledFutureTask父类中还有EventExecutor类型的executor和Runnable类型的task两个成员变量,分别是执行任务的执行器和需要被定时执行的任务本身。

ScheduledFutureTask 的run方法中体现了是如何实现循环的:

  1. public void run() {
  2. assert executor().inEventLoop();
  3. try {
  4. if (delayNanos() > 0L) {
  5. // Not yet expired, need to add or remove from queue
  6. if (isCancelled()) {
  7. scheduledExecutor().scheduledTaskQueue().removeTyped(this);
  8. } else {
  9. scheduledExecutor().scheduleFromEventLoop(this);
  10. }
  11. return;
  12. }
  13. //不循环,直接调用runTask,runTask方法会执行task中的run方法
  14. if (periodNanos == 0) {
  15. if (setUncancellableInternal()) {
  16. V result = runTask();
  17. setSuccessInternal(result);
  18. }
  19. } else {
  20. // check if is done as it may was cancelled
  21. if (!isCancelled()) {
  22. //执行task
  23. runTask();
  24. if (!executor().isShutdown()) {
  25. if (periodNanos > 0) {
  26. //如果按照固定频率,那么这次循环的deadline加上时间段的时长就是下次循环的deadline
  27. deadlineNanos += periodNanos;
  28. } else {
  29. //如果按照固定延迟,就是现在的时间加上时间段的时长,因为这边periodNanos是小于0的,所以这里是减去
  30. deadlineNanos = nanoTime() - periodNanos;
  31. }
  32. //最后因为要循环执行,所以在这里再次将this对象添加到taskQueue中
  33. if (!isCancelled()) {
  34. scheduledExecutor().scheduledTaskQueue().add(this);
  35. }
  36. }
  37. }
  38. }
  39. } catch (Throwable cause) {
  40. setFailureInternal(cause);
  41. }
  42. }

ScheduledFutureTask类型的对象本身就是用来作为任务被执行,ScheduledFutureTask中也包含了一个定时执行的任务(都是Runnable对象)。不管是定时的还是非定时的任务都会放在taskQueue中,它们的run方法都是在NioEventLoop的run方法中调用runAllTasks时循环获取taskQueue中的任务然后被调用。

最后补充一个NioEventLoop执行流程的图

点击查看【processon】