概念和体系结构
Netty 核心组件
Channel (通道)
是Java Nio 的基本构造, 可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以 被打开或者被关闭,连接或者断开连接
回调
其实就是方法,指向被提供给另一个方法的方法的引用. 这使得后 者可以在适当的时候调用前者 .
Future
Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
事件和ChannelHandle
Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经 发生的事件来触发适当的动作
事件可以分类为入站事件与出站事件:
入站事件:
- 连接已被激活或者连接失活;
- 数据读取
- 用户事件
- 错误事件
出站事件:
- 打开或者关闭到远程节点的连接;
将数据写到或者冲刷到套接字。
每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。 这是一个很好的 将事件驱动范式直接转换为应用程序构件块的例子
Netty的组件和设计
Cannel 接口
基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
Socket: 套接字”,是进程间进行网络通信的抽象。
EventLoop
EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系。
- 一个 EventLoopGroup 包含一个或者多个 EventLoop;
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
- 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
- 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
- 一个 EventLoop 可能会被分配给一个或多个 Channel。
ChannelFuture
ChannelFuture 是将来要执行的操作的结果的 占位符
Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会 立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。 Netty 提供了 ChannelFuture 接口,其 addListener()方法注册了一个 ChannelFutureListener,以 便在某个操作完成时(无论是否成功)得到通知。
ChannelHandler
处理数据 据的应用程序逻辑的容器 ,Netty 针对应用实现了不同的实现类供使用
例如:
Adapter 是实现了接口中默认的方法,便于我们调用使用
ChannelPipeLine
ChannelPipeline 为 ChannelHandler 链提供了容器,并定义了用于在该链上传播入站 和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。 ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:
- 一个 ChannelInitializer 的实现被注册到了 ServerBootstrap 中①
- 当 ChannelInitializer.initChannel()方法被调用时,ChannelInitializer 将在 ChannelPipeline 中安装一组自定义的 ChannelHandler;
- ChannelInitializer 将它自己从 ChannelPipeline 中移除。
流动顺序
InHandler 是从头部开始流动,OutHandler 是从尾部开始流动,后添加的先执行.
练习:启动客户与服务端
传输
流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输—一个帮助我们抽象底层数据传输机制的概念。
Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。
传输API
传输 API 的核心是 interface Channel,它被用于所有的 I/O 操作。
每个Channel 都会被分配一个ChannelPipeLine 和 ChannelConfig
ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。
ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实 例,这些 ChannelHandler 实现了应用程序用于处理状态变化以及数据处理的逻辑。
Channel 的方法
方法名 | 描述 |
---|---|
eventLoop | 返回分配给 Channel 的 EventLoop |
pipeline | 返回分配给 Channel 的 ChannelPipeline |
isActive | 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。例如, 一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦 被打开便是活动的 |
localAddress | 返回本地的 SocketAddress |
remoteAddress | 返回远程的 SocketAddress |
write | 将数据写到远程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被 冲刷 |
flush | 将之前已写的数据冲刷到底层传输,如一个 Socket |
writeAndFlush | 一个简便的方法,等同于调用 write()并接着调用 flush() |
传输类型选择
//选择传输类型
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class)
Netty 所提供的传输类型
名称 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作为基础——基于 选择器的方式 |
Epoll | io.netty.channel.epoll | 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持 只有在Linux上可用的多种特性,如SO_REUSEPORT, 比 NIO 传输更快,而且是完全非阻塞的 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作为基础——使用阻塞流 |
Local | io.netty.channel.local | 可以在 VM 内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded 传输,允许使用 ChannelHandler 而又 不需要一个真正的基于网络的传输。这在测试你的 ChannelHandler 实现时非常有用 |
传输模式推荐
应用程序的需求 | 推荐的传输 |
---|---|
非阻塞代码库或者一个常规的起点 | NIO(或者在 Linux 上使用 epoll ) |
阻塞代码库 | OIO |
在同一个 JVM 内部的通信 | Local |
测试 ChannelHandler 的实现 | Embedded |
ByteBuf
正如前面所提到的,网络数据的基本单位总是字节。Java NIO 提供了 ByteBuffer 作为它 的字节容器,但是这个类使用起来过于复杂,而且也有些烦琐。 Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。
Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。
ByteBuf 类
ByteBuf 类维护了两个不同的索引,一个用于读取,一个用于写入.这两个索引初始都在0位置,随你的读写操作而变化
名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或 者 get 开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。 可以指定 ByteBuf 的最大容量。试图移动写索引(即 writerIndex)超过这个值将会触 发一个异常IndexOutOfBoundsException。
使用模式
- 堆缓冲区
将数据存储在JVM 的堆空间中,它能在没有使用池化的情况下提供快速的分配和释放
ByteBuf heapBuf = ...;
if(heapBuf.hasArray()){ //检查ByteByuf是否有一个支撑数组.是否初始化
byte[] array = heapBuf.array();
int length = heapBuf.readableBytes(); //获取可读字节
}
- 直接缓冲区
数据将驻留在会被垃圾回收的堆之外,可以避免每次调用I/O 操作之前(或之后)将缓冲区的内容复制到一个中间缓冲区
缺点是: 数据的分配和释放都会较为昂贵,如果在理遗留代码,也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一 次复制
ByteBuf heapBuf = ...;
if(heapBuf.hasArray()){ //检查 ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区
int length = directBuf.readableBytes(); //获取可读字节数
byte[] array = new byte[length]; //分配一个新的数组来保存具有该长度的字节数据
directBuf.getBytes(directBuf.readerIndex(), array);//将字节复制到该数组
}
- 复合缓冲区
netty通过ByteBuf 子类——CompositeByteBuf——实现了为多个ByteBuf 提供一个聚合视图(将多个缓冲区表示为单个合并缓冲区的虚拟表示),按需添加和删除.
//使用复合缓冲区模式
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...; // can be backing or direct
ByteBuf bodyBuf = ...; // can be backing or direct
messageBuf.addComponents(headerBuf, bodyBuf);//将多个缓冲区追加到复合缓冲区
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
//访问CompositeByteBuf 中的数据
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes(); //获取可读字节
byte[] array = new byte[length]; //分配一个具有可读字节数长度的新数组
compBuf.getBytes(compBuf.readerIndex(), array); //将字节读到该数组中
字节级操作
ByteBuf 提供了许多超出基本读,写操作的方法用于修改数据
1. 随机访问索引
与普通的字节数组一样,ByteBuf 索引是从零开始,最后一个索引总是capacity() - 1;
ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char)b);
}
//get 操作不会影响读写索引, 用 readerIndex(index)或者 writerIndex(index)
//会影响读写索引
2.顺序访问索引
虽然 ByteBuf 同时具有读索引和写索引,但是 JDK 的 ByteBuffer 却只有一个索引,这 也就是为什么必须调用 flip()方法来在读模式和写模式之间进行切换的原因。图 展示了 ByteBuf 是如何被它的两个索引划分成 3 个区域的。
3.可丢弃索引
调用discardReadBytes() 方法,可以丢弃读过的字节并回收空间,但不建议如此做,因为可读字节(图中标记为 CONTENT 的部分)必须被移 动到缓冲区的开始位置.这极有可能会导致内存复制(因为只是移动了可以读取的字节以及 writerIndex,而没有对所有可写入的字节进行擦除写。).
4.可读字节
ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前 readerIndex 的数据,并且将它增加已读字节数。
ByteBuf buffer = ...;
while (buffer.isReadable()) {
5.可写字节
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的 writerIndex 的默认值为 0。任何名称以 write 开头的操作都将从当前的 writerIndex 处 开始写数据,并将它增加已经写入的字节数。如果写操作的目标也是 ByteBuf,并且没有指定 源索引的值,则源缓冲区的 readerIndex 也同样会被增加相同的大小。
writeBytes(ByteBuf dest);
// Fills the writable bytes of a buffer with random integers.
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) { //确认是否还有足够空间
buffer.writeInt(random.nextInt());
}
6.索引管理
- mark(int readlimit)和 reset()方法,分别 被用来将流中的当前位置标记为指定的值,以及将流重置到该位置
- markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex()来标记和重置 ByteBuf 的 readerIndex 和 writerIndex。这些和 InputStream 上的调用类似,只是没有 readlimit 参数来指定标记什么时候失效
- readerIndex(int)或者 writerIndex(int)来将索引移动到指定位置。试 图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException
clear()方法来将 readerIndex 和 writerIndex 都设置为 0。注意,这 并不会清除内存中的内容。(clear()比调用 discardReadBytes()轻量得多,因为它将只是重置索引而不会复 制任何的内存。)
7.查找操作
使用 indexOf() 来简单查找
通过那些需要一个ByteProcessor 作为参数的方法来查找
8. 派生缓冲区
为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方 法被创建的:
duplicate()
- slice()
- slice(int, int);
- Unpooled.unmodifiableBuffer(…);
- order(ByteOrder);
- readSlice(int)
每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记 索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这使得派生缓冲区的创建成本 是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所 以要小心
ByteBuf 复制 如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方 法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本
9. 读写操作
读写操作根据是否会影响索引分为两种
- get()和 set()操作,从给定的索引开始,并且保持索引不变;
- read()和 write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索 引进行调整。
get()操作
名称 | 描述 |
---|---|
getBoolean(int) | 返回给定索引处的 Boolean 值 |
getByte(int) | 返回给定索引处的字节 |
getUnsignedByte(int) | 将给定索引处的无符号字节值作为 short 返回0 |
getMedium(int) | 返回给定索引处的 24 位的中等 int 值 |
getUnsignedMedium(int) | 返回给定索引处的无符号的 24 位的中等 int 值 |
getInt(int) | 返回给定索引处的 int 值 |
getUnsignedInt(int) | 将给定索引处的无符号 int 值作为 long 返回 |
getLong(int | 返回给定索引处的 long 值 |
getShort(int) | 返回给定索引处的 short 值 |
getUnsignedShort(int) | 将给定索引处的无符号 short 值作为 int 返回 |
getBytes(int, …) | 将该缓冲区中从给定索引开始的数据传送到指定的目的地 |
set()操作
名称 | 描述 |
---|---|
setBoolean(int, boolean) | 设定给定索引处的 Boolean 值 |
setByte(int index, int value) | 设定给定索引处的字节值 |
setMedium(int index, int value) | 设定给定索引处的 24 位的中等 int 值 |
setInt(int index, int value) | 设定给定索引处的 int 值 |
setLong(int index, long value) | 设定给定索引处的 long 值 |
setShort(int index, int value) | 设定给定索引处的 short 值 |
read()操作
名称 | 描述 |
---|---|
readBoolean( | 返回当前 readerIndex 处的 Boolean,并将 readerIndex 增加 1 |
readByte() | 返回当前 readerIndex 处的字节,并将 readerIndex 增加1 |
readUnsignedByte() | 将当前 readerIndex 处的无符号字节值作为 short 返回,并将 readerIndex 增加1 |
readMedium( | 返回当前 readerIndex 处的 24 位的中等 int 值,并将 readerIndex 增加 3 |
readUnsignedMedium() | 返回当前 readerIndex 处的 24 位的无符号的中等 int 值,并将 readerIndex 增加 3 |
readInt() | 返回当前 readerIndex 的 int 值,并将 readerIndex 增加 4 |
readUnsignedInt() | 将当前 readerIndex 处的无符号的 int 值作为 long 值返回,并将 readerIndex 增加 4 |
readLong() | 返回当前 readerIndex 处的 long 值,并将 readerIndex 增加 8 |
readShort() | 返回当前 readerIndex 处的 short 值,并将 readerIndex 增加 2 |
readUnsignedShort() | 将当前 readerIndex 处的无符号 short 值作为 int 值返回,并将 readerIndex 增加 2 |
readBytes(ByteBuf | byte[] destination, int dstIndex [,int length]) | 将当前 ByteBuf 中从当前 readerIndex 处开始的(如果设置了, length 长度的字节)数据传送到一个目标 ByteBuf 或者 byte[],从 目标的 dstIndex 开始的位置。本地的 readerIndex 将被增加已经传 输的字节数 |
write()操作
名称 | 描述 |
---|---|
writeBoolean(boolean) | 在当前 writerIndex 处写入一个 Boolean,并将 writerIndex 增加 1 |
writeByte(int) | 在当前 writerIndex 处写入一个字节值,并将 writerIndex 增加 1 |
writeMedium(int) | 在当前 writerIndex 处写入一个中等的 int 值,并将 writerIndex 增加 3 |
writeInt(int) | 在当前 writerIndex 处写入一个 int 值,并将 writerIndex 增加 4 |
writeLong(long) | 在当前 writerIndex 处写入一个 long 值,并将 writerIndex 增加 8 |
writeShort(int) | 在当前 writerIndex 处写入一个 short 值,并将 writerIndex 增加 2 |
writeBytes(source ByteBuf |byte[] [,int srcIndex ,int length]) | 从当前 writerIndex 开始,传输来自于指定源(ByteBuf 或者 byte[]) 的数据。如果提供了 srcIndex 和 length,则从 srcIndex 开始读取, 并且处理长度为 length 的字节。当前 writerIndex 将会被增加所写入 的字节数 |
更多操作
名称 | 描述 |
---|---|
isReadable() | 如果至少有一个字节可供读取,则返回 true |
isWritable() | 如果至少有一个字节可被写入,则返回 true |
readableBytes() | 返回可被读取的字节数 |
writableBytes() | 返回可被写入的字节数 |
capacity( | 返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直 到达到 maxCapacity() |
maxCapacity( | 返回 ByteBuf 可以容纳的最大字节数 |
hasArray( | 如果 ByteBuf 由一个字节数组支撑,则返回 true |
array( | 如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个 UnsupportedOperationException 异常 |
ByteBufHolder 接口
如果想要实现一个将其有效负载存储在 ByteBuf 中的消息对象,那么 ByteBufHolder 将 是个不错的选择
ByteBufHolder 只有几种用于访问底层数据和引用计数的方法。表 列出了它们(这里 不包括它继承自 ReferenceCounted 的那些方法)
名称 | 描述 |
---|---|
content() | 返回由这个 ByteBufHolder 所持有的 ByteBuf |
copy() | 返回这个 ByteBufHolder 的一个深拷贝,包括一个其所包含的 ByteBuf 的非共享副本 |
duplicate() | 返回这个 ByteBufHolder 的一个浅拷贝,包括一个其所包含的 ByteBuf 的共享副本 |
ByteBuf 管理
除了实际的数据负载之外,我们还需要存储各种属性值。ByteBufHolder 就是为了处理这种常见的情况,
管理 ByteBuf 实例的不同方式
1.按需分配:ByteBufAllocator 接口
为了降低分配和释放内存的开销,,Netty 通过 interface ByteBufAllocator 实现了 (ByteBuf 的)池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。
名 称 | 描述 |
---|---|
buffer() buffer(int initialCapacity); buffer(int initialCapacity, int maxCapacity); | 返回一个基于堆或者直接内存 存储的 ByteBuf |
heapBuffer() heapBuffer(int initialCapacity) heapBuffer(int initialCapacity, int maxCapacity) | 返回一个基于堆内存存储的 ByteBuf |
directBuffer() directBuffer(int initialCapacity) directBuffer(int initialCapacity, int maxCapacity) | 返回一个基于直接内存存储的 ByteBuf |
compositeBuffer() compositeBuffer(int maxNumComponents) compositeDirectBuffer() compositeDirectBuffer(int maxNumComponents); compositeHeapBuffer() compositeHeapBuffer(int maxNumComponents); | 返回一个可以通过添加最大到 指定数目的基于堆的或者直接 内存存储的缓冲区来扩展的 CompositeByteBuf |
ioBuffer() | 返回一个用于套接字的 I/O 操 作的 ByteBuf |
- 可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)
- 绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用
Netty 提供了两种 ByteBufAllocator 的实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator。前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片,后者的实池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。//1
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
//2
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
2. Unpooled 缓冲区
未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf
名称 | 描述 |
---|---|
buffer() buffer(int initialCapacity) buffer(int initialCapacity, int maxCapacity) | 返回一个未池化的基于堆内存存储的 ByteBuf |
directBuffer() directBuffer(int initialCapacity) directBuffer(int initialCapacity, int maxCapacity) | 返回一个未池化的基于直接内存存储 的 ByteBuf |
wrappedBuffer() | 返回一个包装了给定数据的 ByteBuf |
copiedBuffer() | 返回一个复制了给定数据的 ByteBuf |
3.ByteBufUtil 类
ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的,并 且和池化无关,所以这些方法已然在分配类的外部实现。 这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录 ByteBuf 的内容。十 六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的 版本还可以很容易地转换回实际的字节表示。 另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。如果你实现自己的 ByteBuf 子类,你可能会发现 ByteBufUtil 的其他有用方法。
引用计数
通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第 4 版中为 ByteBuf 和 ByteBufHolder 引入了 引用计数技术,它们都实现了 interface ReferenceCounted。
ChannelHandler 和 ChannelPipeline
ChannelHandler家族
Channel 生命周期
Interface Channel 定义了一组和 ChannelInboundHandler API 密切相关的简单但 功能强大的状态模型,
状态模型 | 描述 |
---|---|
ChannelUnregistered | Channel 已经被创建,但还未注册到 EventLoop |
ChannelRegistered | Channel 已经被注册到了 EventLoop |
ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 |
ChannelInactive | Channel 没有连接到远程节点 |
ChannelHandler 生命周期
interface ChannelHandler 定义的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些 方法中的每一个都接受一个 ChannelHandlerContext 参数。
类型 | 描述 |
---|---|
handlerAdded | 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用 |
handlerRemoved | 当从 ChannelPipeline 中移除 ChannelHandler 时被调用 |
exceptionCaught | 当处理过程中在 ChannelPipeline 中有错误产生时被调用 |
Netty 定义了下面两个重要的 ChannelHandler 子接口:
- ChannelInboundHandler——处理入站数据以及各种状态变化;
- ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作
ChannelInboundHandler 接口