在早期的Java语言中,我们使用最多的线程处理的主要方式无非是按需创建和启动新的 Thread 来执行并发的任务单元(即为每个任务单独创建一条线程处理),这种方式在高并发的场景下工作效率不高且资源浪费严重。随后引入了线程池技术,通过缓存和重用Thread 极大地提高了性能。

虽然池化和重用线程相对于简单地为每个任务都创建和销毁线程是一种进步,但是它并不能消除由上下文切换所带来的开销,其随着线程数量的增加很快变得明显。我们来看看Netty是如何解决这些问题来把每个线程的性能都压榨到极致。

NioEventLoop

Netty的 EventLoop 是协同设计的一部分,它采用了两个基本的 API:并发和网络编程。
微信截图_20210824142824.png
在 Netty 中,一个 NioEventLoop 将由一个永远都不会改变的 Thread 驱动。

驱动线程的创建及绑定

还记得在创建 NioEventLoopGroup 时创建了一个 ThreadPerTaskExcutor 类型的 excutor吗?NioEventLoop 的驱动线程就是由该 excutor 创建的。接下来让我们看看它是如何创建的:

  1. private void doStartThread() {
  2. assert this.thread == null;
  3. //ThreadPerTaskExcutor来创建
  4. this.executor.execute(new Runnable() {
  5. public void run(){
  6. ......
  7. }
  8. });
  9. }

调用 ThreadPerTaskExcutor 的 excutor() 方法,直接创建一条线程并调用 start() 方法立即开启线程

  1. public void execute(Runnable command) {
  2. this.threadFactory.newThread(command).start();
  3. }

线程开启后,会调用匿名内部类中在传进去 run() 方法的逻辑,在这里进行线程的绑定

  1. public void run() {
  2. //this是SingleThreadEventExecutor,即NioEventLoop的父类
  3. //把thread属性赋值为当前线程,即通过 ThreadPerTaskExcutor 新创建的线程
  4. SingleThreadEventExecutor.this.thread = Thread.currentThread();
  5. if (SingleThreadEventExecutor.this.interrupted) {
  6. SingleThreadEventExecutor.this.thread.interrupt();
  7. }
  8. boolean success = false;
  9. SingleThreadEventExecutor.this.updateLastExecutionTime();
  10. boolean var112 = false;
  11. int oldState;
  12. label1907: {
  13. try {
  14. var112 = true;
  15. //调用 NioEventLoop的 run()方法开始循环处理事件
  16. SingleThreadEventExecutor.this.run();
  17. success = true;
  18. var112 = false;
  19. break label1907;
  20. } catch (Throwable var119) {
  21. .....
  22. }finally {
  23. .....
  24. }
  25. }

NioEventLoop任务执行方式

Netty中所采用的线程模型,是通过 NioEventLoop 所绑定的线程来处理该 NioEventLoop 中产生的所有事件。Netty 卓越的性能正是因为当前执行的 Thread 身份的确定,把该线程的性能压榨到极致,相比于线程池,省去了线程切换的时间和排队从阻塞队列获取任务的时间。

接下来我们来看一看其任务调度的具体实现,以 ChannelHandlerContext 中触发Handler的执行 channelRead() 的 fireChannelRead()方法为例:

  1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  2. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  3. EventExecutor executor = next.executor();
  4. //判断当前调用线程是否是NioEventLoop绑定的线程
  5. if (executor.inEventLoop()) {
  6. //如果是则立即执行
  7. next.invokeChannelRead(m);
  8. } else {
  9. //如果不是则调用NioEventLoop的excute()方法放入taskQueue
  10. executor.execute(new Runnable() {
  11. public void run() {
  12. next.invokeChannelRead(m);
  13. }
  14. });
  15. }
  16. }

如果当前调用的线程正是 NioEventLoop 绑定的线程,那么所提交的代码块将立即执行,否则将该任务放入其内部的 taskQueue 中
微信截图_20210824190947.png
判断是否为绑定的线程代码如下

  1. public boolean inEventLoop() {
  2. return this.inEventLoop(Thread.currentThread());
  3. }
  4. ---------------------------------
  5. public boolean inEventLoop(Thread thread) {
  6. return thread == this.thread;
  7. }

将任务放入 taskQueue 的代码如下:

  1. public void execute(Runnable task) {
  2. if (task == null) {
  3. throw new NullPointerException("task");
  4. } else {
  5. boolean inEventLoop = this.inEventLoop();
  6. //放入taskQueue
  7. this.addTask(task);
  8. if (!inEventLoop) {
  9. //如果当前绑定的线程还没创建,则利用ThreadPerTaskExcutor创建线程
  10. this.startThread();
  11. ....
  12. }
  13. ....
  14. }
  15. }