1.1 阻塞io与非阻塞io
**阻塞io的缺点:**

  • 想处理多个socket的连接的话,需要创建多个线程,一个线程对应一个。
  • 连接太多,占用大量的空间和内存,还有线程之间的切换
  • 大量的线程处于等待数据状态,会造成资源浪费

image.png
非阻塞线程nio(Non-Blocking io)
通过多路复用让一个线程去处理多个socket。
image.png
只需要少量的线程就能搞多个socket,线程只需要通过selector去查一下它所管理的socket集合,就去处理哪个socket。

nio由三个核心组件组成:

  • Buffer
  • Channel
  • Selector

缓存区和通道是NIO的核心对象。
通道Channel对源IO中流模拟,所有数据都要通过通道进行传输;
Buffer实质上是一个容器对象,发送给通道的所有对象都必须首先放到一个缓冲区中。

Buffer包含要写入或者刚读出的数据。java nio中,任何时候访问NIO的数据,都需要通过缓冲区(Buffer)进行操作。读取数据时,直接从缓冲区中读取,写入数据时,写入至缓冲区。

缓冲区体统了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

Buffer:一块连续的内存块,是NIO数据读或写的中转地。(最常用的缓冲区是 Byte Buffer)
image.png
Channel就是数据的源头或者数据的目的地,用于向buffer提供数据或者读取buffer数据,并且对I/O提供异步支持。
我们永远不会将字节直接写入通道中,只是将数据写入包含一个或多个字节的缓冲区。读取同理

image.png

Channel为顶层接口,所有子Channel都实现了该接口,主要用于I/O操作的连接。

  1. public interface Channel extends Closeable {
  2. /**
  3. * 判断此通道是否处于打开状态。
  4. */
  5. public boolean isOpen();
  6. /**
  7. *关闭此通道。
  8. */
  9. public void close() throws IOException;
  10. }

最为重要的Channel实现类为:

  • FileChannel:一个用来写、读、映射和操作文件的通道

    FileInputStream.getChannel()
    FileOutputStream.getChannel()
    RandomAccessFile.getChannel()
    
  • DatagramChannel:能通过UDP读写网络中的数据

  • SocketChannel: 能通过TCP读写网络中的数据
  • ServerSocketChannel:可以监听新进来的 TCP 连接,像 Web 服务器那样。对每一个新进来的连接都会创建一个SocketChannel

FileChannel:

/**
 * 读取一串数据到缓冲区
 */
public long read(ByteBuffer[] dsts)
/**
 * 将缓冲区中指定位置的一串数据写入到通道
 */
public long write(ByteBuffer[] srcs)

多路复用Selector,提供了选择已经就绪的任务的能力。
selector提供了询问通道是否已经准备好执行每个I/O操作的能力。selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生了读或写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel集合,进行后续的I/O操作。

Selector 就是你注册对各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉你所发生的事件。Selector 允许一个线程处理多个 Channel ,也就是说只要一个线程复杂 Selector 的轮询,就可以处理成千上万个 Channel ,相比于多线程来处理势必会减少线程的上下文切换问题。

/**
 * 第一步:创建一个Selector
 */
Selector selector = Selector.open();
/**
 * 第二步:打开一个远程连接
 */
InetSocketAddress socketAddress = new InetSocketAddress("www.baidu.com", 80);
SocketChannel sc = SocketChannel.open(socketAddress);
sc.configureBlocking(false);
/**
 * 第三步:选择键,注册
 */
SelectionKey key = sc.register(selector, SelectionKey.OP_CONNECT);
/**
 * 注册时第一个参数总是当前的这个selector。
 * 注册读事件
 * 注册写事件
 */
SelectionKey key = sc.register(selector, SelectionKey.OP_READ);
SelectionKey key = sc.register(selector, SelectionKey.OP_WRITE);
/**
 * 第四步:内部循环处理
 */
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
    SelectionKey key = (SelectionKey)it.next();
    SelectionKey selectionKey = iterator.next();
    iterator.remove();
    //handleKey(selectionKey);
    // ... deal with I/O event ...
}
  1. 首先调用Selector的select()方法,这个方法会阻塞,直到至少有一个已注册的事件发生,当一个或者更多的事件发生时, select()方法将返回所发生的事件的数量。该方法必须首先执行。
  2. 调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个集合。SelectionKey中共定义了四种事件,OP_ACCEPT(socket accept)、OP_CONNECT(socket connect)、OP_READ(read)、OP_WRITE(write)。

通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,您必须确定发 生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。

  1. 处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。

如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。
我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey

首先它是个框架,是个“半成品”,不能开箱即用,你必须得拿过来做点定制,利用它开发出自己的应用程序,
然后才能运行(就像使用Spring那样)。

1.2 Netty框架:

JDK原生的nio问题:

  • NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
  • 需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
  • 可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
  • JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。


Netty特点:

  1. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
  2. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
  4. 安全:完整的 SSL/TLS 和 StartTLS 支持。
  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。


Netty 常见使用场景:

  • 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
  • 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
  • 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。


Netty 高性能设计:

是异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
多路复用IO模型:
image.png

netty的selector:
image.png

线程模型:

事件驱动模型

1)事件队列(event queue):接收事件的入口,存储待处理事件;
2)分发器(event mediator):将不同的事件分发到不同的业务逻辑单元;
3)事件通道(event channel):分发器与处理器之间的联系渠道;
4)事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。
image.png

Reactor 线程模型:

指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
1)Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做 出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
2)Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
image.png

Netty 线程模型:

在基于主从 Reactors 多线程模型进行修改,主从 Reactor 多线程模型有多个 Reactor:
1)MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor;
2)SubReactor 负责相应通道的 IO 读写请求;
3)非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
image.png

虽然 Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。但是实际实现上 SubReactor 和 Worker 线程在同一个线程池中:

EventLoopGroup bossGroup = newNioEventLoopGroup();

EventLoopGroup workerGroup = newNioEventLoopGroup();

ServerBootstrap server = newServerBootstrap();

server.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class)

异步处理:
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。

1)通过 isDone 方法来判断当前操作是否完成;
2)通过 isSuccess 方法来判断已完成的当前操作是否成功;
3)通过 getCause 方法来获取已完成的当前操作失败的原因;
4)通过 isCancelled 方法来判断已完成的当前操作是否被取消;
5)通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。

serverBootstrap.bind(port).addListener(future -> {
  if(future.isSuccess()) {
     System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
  } else{
     System.err.println("端口["+ port + "]绑定失败!");
  }
});

异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

功能特性:

1)传输服务:支持 BIO 和 NIO;
2)容器集成:支持 OSGI、JBossMC、Spring、Guice 容器;
3)协议支持:HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议;
4)Core 核心:可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。
image.png
image.png

模块组件:

【Bootstrap、ServerBootstrap】:
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
【Future、ChannelFuture】:
正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。
但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
【Channel】:
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
1)当前网络连接的通道的状态(例如是否打开?是否已连接?)
2)网络连接的配置参数 (例如接收缓冲区大小)
3)提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
4)调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
5)支持关联 I/O 操作与对应的处理程序。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。

NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

【Selector】:
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
【NioEventLoop】:
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。
【NioEventLoopGroup】:
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
【ChannelHandler】:
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler // 用于处理入站 I/O 事件。
ChannelOutboundHandler // 用于处理出站 I/O 操作。
// 适配器类
ChannelInboundHandlerAdapter // 用于处理入站 I/O 事件。
ChannelOutboundHandlerAdapter // 用于处理出站 I/O 操作。
ChannelDuplexHandler // 用于处理入站和出站事件。

【ChannelHandlerContext】:
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
【ChannelPipline】:
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
下图引用 Netty 的 Javadoc 4.1 中 ChannelPipeline 的说明,描述了 ChannelPipeline 中 ChannelHandler 通常如何处理 I/O 事件。
I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。
image.png

Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应
image.png
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。

入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

Netty框架的工作原理:

初始化并启动 Netty 服务端的过程代码
1)初始化创建 2 个 NioEventLoopGroup:其中 boosGroup 用于 Accetpt 连接建立事件并分发请求,workerGroup 用于处理 I/O 读写事件和业务逻辑。
2)基于 ServerBootstrap(服务端启动引导类):配置 EventLoopGroup、Channel 类型,连接参数、配置入站、出站事件 handler。
3)绑定端口:开始工作。

public static void main(String[] args) {
       // 创建mainReactor
       NioEventLoopGroup boosGroup = newNioEventLoopGroup();
       // 创建工作线程组
       NioEventLoopGroup workerGroup = newNioEventLoopGroup();

       finalServerBootstrap serverBootstrap = newServerBootstrap();

       serverBootstrap
                // 组装NioEventLoopGroup
               .group(boosGroup, workerGroup)
                // 设置channel类型为NIO类型
               .channel(NioServerSocketChannel.class)
               // 设置连接配置参数
               .option(ChannelOption.SO_BACKLOG, 1024)
               .childOption(ChannelOption.SO_KEEPALIVE, true)
               .childOption(ChannelOption.TCP_NODELAY, true)
               // 配置入站、出站事件handler
               .childHandler(newChannelInitializer<NioSocketChannel> () {
                   @Override
                   protectedvoidinitChannel(NioSocketChannel ch) {
                       // 配置入站、出站事件channel
                       ch.pipeline().addLast(...);
                       ch.pipeline().addLast(...);
               }
   });

       // 绑定端口
       int port = 8080;
       serverBootstrap.bind(port).addListener(future -> {
           if(future.isSuccess()) {
               System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
           } else{
               System.err.println("端口["+ port + "]绑定失败!");
           }
       });
}

image.png

Server 端包含 1 个 Boss NioEventLoopGroup 和 1 个 Worker NioEventLoopGroup。

NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。

每个 Boss NioEventLoop 循环执行的任务包含 3 步:
1)轮询 Accept 事件;
2)处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上;
3)处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。

每个 Worker NioEventLoop 循环执行的任务包含 3 步:
1)轮询 Read、Write 事件;
2)处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;
3)处理任务队列中的任务,runAllTasks。

任务队列中的 Task 有 3 种典型使用场景:
① 用户程序自定义的普通任务:

ctx.channel().eventLoop().execute(newRunnable() {
   @Override
   publicvoidrun() {
       //...
   }
});

② 非当前 Reactor 线程调用 Channel 的各种方法:
例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。
③ 用户自定义定时任务:

ctx.channel().eventLoop().schedule(newRunnable() {
   @Override
   publicvoidrun() {

   }
}, 60, TimeUnit.SECONDS);

推荐使用的主流稳定版本还是 Netty4,Netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显,所以这个版本不推荐使用

Netty服务小例子:

包含一个接收连接的线程池(也有可能是单个线程,boss线程池)以及一个处理连接的线程池(worker线程池)。boss负责接收连接,并进行IO监听;worker负责后续的处理。

package cn.xingoo.book.netty.chap04;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class NettyNioServer {
    public void serve(int port) throws InterruptedException {
        final ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));
        // 第一步,创建线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            // 第二步,创建启动类
            ServerBootstrap b = new ServerBootstrap();
            // 第三步,配置各组件
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ctx.writeAndFlush(buffer.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            // 第四步,开启监听
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NettyNioServer server = new NettyNioServer();
        server.serve(5555);
    }
}

阻塞io

public class NettyOioServer {
    public void serve(int port) throws InterruptedException {
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8")));

        EventLoopGroup bossGroup = new OioEventLoopGroup(1);
        EventLoopGroup workerGroup = new OioEventLoopGroup();

        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置boss和worker
                    .channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel
         ....
public class ByteBufTest {
    public static void main(String[] args) {
        //创建bytebuf
        ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());
        System.out.println(buf);

        // 读取一个字节
        buf.readByte();
        System.out.println(buf);

        // 读取一个字节
        buf.readByte();
        System.out.println(buf);

        // 丢弃无用数据
        buf.discardReadBytes();
        System.out.println(buf);

        // 清空
        buf.clear();
        System.out.println(buf);

        // 写入
        buf.writeBytes("123".getBytes());
        System.out.println(buf);

        buf.markReaderIndex();
        System.out.println("mark:"+buf);

        buf.readByte();
        buf.readByte();
        System.out.println("read:"+buf);

        buf.resetReaderIndex();
        System.out.println("reset:"+buf);
    }
}


// 输出
UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5)
reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)

在Netty中通过独立的读写索引维护,避免读写模式的切换。

Netty的Buffer和零拷贝

  1. 通过Composite和slice实现逻辑上的Buffer的组合和拆分,重新维护索引,避免内存拷贝过程。
  2. 通过DirectBuffer申请堆外内存,避免用户空间的拷贝。不过堆外内存的申请和释放都很麻烦,推荐小心使用。
  3. 通过FileRegion包装FileChannel,直接实现channel到channel的传输。

image.png

Handler的使用

image.png
当接收消息的时候,会从链表的表头开始遍历,如果是inbound就调用对应的方法;如果发送消息则从链表的尾巴开始遍历。那么上面途中的例子,接收消息就会输出:

InboundA --> InboundB --> InboundC

输出消息,则会输出:

OutboundC --> OutboundB --> OutboundA
package cn.xingoo.book.netty.pipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;

/**
 * 注意:
 *
 * 1 ChannelOutboundHandler要在最后一个Inbound之前
 *
 */
public class NettyNioServerHandlerTest {

    final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));

    public void serve(int port) throws InterruptedException {


        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("1",new InboundA());
                            pipeline.addLast("2",new OutboundA());
                            pipeline.addLast("3",new InboundB());
                            pipeline.addLast("4",new OutboundB());
                            pipeline.addLast("5",new OutboundC());
                            pipeline.addLast("6",new InboundC());
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NettyNioServerHandlerTest server = new NettyNioServerHandlerTest();
        server.serve(5555);
    }

    private static class InboundA extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
        }
    }

    private static class InboundB extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
            // 从pipeline的尾巴开始找outbound
            ctx.channel().writeAndFlush(buffer);
        }
    }

    private static class InboundC extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
            // 这样会从当前的handler向前找outbound
            //ctx.writeAndFlush(buffer);
        }
    }

    private static class OutboundA extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundA write");
            super.write(ctx, msg, promise);
        }
    }

    private static class OutboundB extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundB write");
            super.write(ctx, msg, promise);
        }
    }

    private static class OutboundC extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundC write");
            super.write(ctx, msg, promise);
        }
    }
}