一 netty 概述
1.1 什么是netty
- Netty是由JBOSS提供的一个Java开源框架,现为Github上的独立项目
- Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
1.2 Netty的应用场景
互联网行业
阿里分布式服务框架Dubbo的RPC框架使用Dubbo协议进行节点间通信,Dubbo协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间的内部通信
游戏行业
- Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器
- 地图服务器之间可以方便的通过 Netty 进行高性能的通信
大数据领域
- 经典的 Hadoop的高性能通信和序列化组件Avro的 RPC 框架,默认采用 Netty 进行跨界点通信
它的 Netty Service 基于 Netty 框架二次封装实现。
二 Java IO模型介绍
2.1 I/O模型简单说明
I/O模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。Java共支持3种网络编程模型IO模式:BIO、NIO、AIO。
Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
- Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
Java AIO(NIO.2) : 异步非阻塞,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
2.2 I/O模型使用场景
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择。
- NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
2.3 BIO 原理
同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。可以通过线程池机制改善(实现多个客户端连接服务器),但是不能解决性能问题。
2.3.1 BIO 工作机制
服务器端启动一个ServerSocket
- 客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯
- 客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
-
2.3.2 BIO demo
使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,就启动一个线程与之通讯。
- 要求使用线程池机制改善,可以连接多个客户端.
服务器端可以接收客户端发送的数据(通过cmd的telnet方式即可)。 ```java public class BIOServer { public static void main(String[] args) throws IOException {
//1、创建一个线程池
//2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
ExecutorService executorService = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
//监听,等待客户端连接
System.out.println("等待连接");
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//创建一个线程,与之通讯
executorService.execute(() -> {
//重写Runnable方法,与客户端进行通讯
handler(socket);
});
}
}
//编写一个Handler方法,和客户端通讯 public static void handler(Socket socket) {
try {
System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true){
System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
System.out.println("read....");
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} }
**问题分析:**
1. 服务端主线程会阻塞在 serverSocket.accept() 这个方法处,当有新的客户端发起请求时,主线程通过线程池调用新线程与其通信,每个通信线程会阻塞在socket.getInputStream() 这个方法处。
1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。
1. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
1. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费
<a name="dB6qB"></a>
## 2.4 NIO 原理
1. Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的
1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
1. NIO是 面向缓冲区 ,或者面向块编程的。数据读取后放到缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
1. Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直到数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某个通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
1. 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。
PS: HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。
<a name="cQn8U"></a>
### 2.4.1 NIO 工作机制
![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600915148822-966978bd-a87e-4130-963f-3a31dec04f28.png#height=454&id=HuOJM&name=image.png&originHeight=454&originWidth=362&originalType=binary&ratio=1&size=110074&status=done&style=none&width=362)
1. 每个channel 都对应一个Buffer
1. Selector 对应一个线程,一个线程对应多个channel(连接)
1. 假设三个channel 注册到selector
1. Event(事件)决定程序切换到哪个channel
1. Selector根据不同的事件,在各个通道上切换
1. Buffer是一块内存,底层是一个数组
1. 数据的读取写入是通过Buffer, 这个和BIO 一样, 而BIO中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer 是可以读也可以写, 需要flip方法切换。
<a name="LT38A"></a>
### 2.4.2 Buffer 介绍
缓冲区本质上是一个可读可写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600917191630-ede60874-5a34-42a0-9b41-80258b30c563.png#height=114&id=kJ5fH&margin=%5Bobject%20Object%5D&name=image.png&originHeight=272&originWidth=1264&originalType=binary&ratio=1&size=54122&status=done&style=none&width=529)<br />**主要种类**
1. ByteBuffer,存储字节数据到缓冲区
1. ShortBuffer,存储字符串数据到缓冲区
1. CharBuffer,存储字符数据到缓冲区
1. IntBuffer,存储整数数据到缓冲区
1. LongBuffer,存储长整型数据到缓冲区
1. DoubleBuffer,存储小数到缓冲区
1. FloatBuffer,存储小数到缓冲区
**主要属性**
| **属性** | **描述** |
| --- | --- |
| Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 |
| Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
| Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备 |
| Mark | 标记 ,一般不会主动修改,在`flip()`<br />被调用后,mark就作废了。 |
**类关系图**<br />![WX20200924-123555@2x.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600922201481-86213458-141a-4ce9-8cd7-3f7fde3975f3.png#height=466&id=gNfFV&margin=%5Bobject%20Object%5D&name=WX20200924-123555%402x.png&originHeight=466&originWidth=1648&originalType=binary&ratio=1&size=109600&status=done&style=none&width=1648)<br />MappedByteBuffer 可以直接操作系统内存<br />**主要方法**
1. public final int capacity();
1. 直接返回了此缓冲区的容量,capacity
2. public final int position();
1. 直接返回了此缓冲区指针的当前位置
3. public final Buffer position(int newPosition);
1. 设置此缓冲区的位置,设置position
4. public final int limit();
1. 返回此缓冲区的限制
5. public final Buffer limit(int newLimit);
1. 设置此缓冲区的限制,设置limit
6. public final Buffer clear();
1. 清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。
7. public final Buffer flip();
1. 反转此缓冲区, limit = position;position = 0;mark = -1。
1. 当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。
1. flip()方法将limit置向position, position置0,那么从position读取数据到limit即为此缓冲区中所有的数据。
8. public final boolean hasRemaining();
1. 告知当前位置和限制之间是否有元素。return position < limit;
9. public abstract boolean isReadOnly();
1. 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
10. public abstract boolean hasArray();
1. 告知此缓冲区是否具有可访问的底层实现数组
11. public abstract Object array();
1. 返回此缓冲区的底层实现数组
**ByteBuffer 方法**
1. public static ByteBuffer allocateDirect(int capacity);
1. 创建直接缓冲区
2. public static ByteBuffer allocate(int capacity) ;
1. 设置缓冲区的初识容量
3. public abstract byte get();
1. 从当前位置position上get数据,获取之后,position会自动加1
4. public abstract byte get(int index);
1. 通过绝对位置获取数据。
5. public abstract ByteBuffer put(byte b);
1. 从当前位置上添加,put之后,position会自动加1
6. public abstract ByteBuffer put(int index, byte b);
1. 从绝对位置上添加数据
7. public abstract ByteBuffer putXxx(Xxx value [, int index]);
1. 从position当前位置插入元素。Xxx表示基本数据类型
1. 此方法时类型化的 put 和 get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
<a name="dWYMv"></a>
### 2.4.3 Buffer demo
```java
//创建一个Buffer,大小为5,即可以存放5个int
IntBuffer intBuffer = IntBuffer.allocate(5);
//向buffer中存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
//如何从buffer中读取数据
//将buffer转换,读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
Buffer 执行原理
- Buffer 刚创建时,capacity = 5 ,固定不变。limit指针指向5,position指向0,mark指向-1
- 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同,即limit=position=5
- 调用 buffer.flip()实际上是重置了position和limit两个变量,将limit放在position的位置,即 position=0、limit=5。
调用 intBuffer.get方法,实际上是不断移动position指针,直到它移动到limit的位置
2.4.4 通道(Channel)
通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓存读数据,也可以写数据到缓存
- BIO中的stream是单向的,例如:FileInputStream对象只能进行读取数据的操作,而NIO中的Channel是双向的,可读可写操作。
- Channel在 NIO 中是一个接口:public interface Channel extends Closeable{}
- 常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel(类似ServerSocket)、SocketChannel(类似Socket)
- FileChannel 用于文件数据的读写,DatagramChannel用于UDP数据的读写,ServerSocketChannel和SocketChannel用于TCP数据读写。
类图关系
FileChannel类 常见方法
- public int read(ByteBuffer dst)
- 从通道读取数据并放到缓冲区中
- 此操作也会移动 Buffer 中的position指针,不断往position中放数据,read完成后position指向limit。
- public int write(ByteBuffer src)
- 把缓冲区的数据写到通道中
- 此操作也会不断移动Buffer中的position位置直到limit,读取到的数据就是position到limit这两个指针之间的数据。
- public long transferFrom(ReadableByteChannel src, long position, long count)
- 从目标通道中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target)
将数据写入到本地文件
public static void main(String[] args) throws IOException {
String str = "hello,world";
//创建一个输出流 -> Channel
FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file01.txt");
//通过 FileOutputStream 获取对应的 FileChannel
//这个 FileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将str放入ByteBuffer
byteBuffer.put(str.getBytes());
//对ByteBuffer进行反转,开始读取
byteBuffer.flip();
//将ByteBuffer数据写入到FileChannel
//此操作会不断移动 Buffer中的 position到 limit 的位置
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
有代码得知我们java原生的流里面内嵌了通道。从本地文件读取数据
public static void main(String[] args) throws IOException {
//创建文件的输入流
File file = new File("/Users/hezhaoming/Downloads/file01.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将通道的数据读入到buffer
fileChannel.read(byteBuffer);
//将ByteBuffer 的字节数据转成String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
3. 文件01输出到文件02public static void main(String[] args) throws IOException {
//读取
FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/file01.txt");
FileChannel fileChannel1 = fileInputStream.getChannel();
//输出
FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file02.txt");
FileChannel fileChannel2 = fileOutputStream.getChannel();
//缓冲块
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true) {
//清空buffer,由于循环的最后执行了 write 操作,会将 position 移动到 limit 的位置
//清空 Buffer的操作才为上一次的循环重置position的位置
// 如果没有重置position,那么上次读取后,position和limit位置一样,读取后read的值永远为0
byteBuffer.clear();
//将数据存入 ByteBuffer,它会基于 Buffer 此刻的 position 和 limit 的值,
// 将数据放入position的位置,然后不断移动position直到其与limit相等;
int read = fileChannel1.read(byteBuffer);
if (read == -1) { //表示读完
break;
}
//将buffer中的数据写入到 FileChannel02 ---- file02.txt
byteBuffer.flip();
fileChannel2.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
拷贝图片
public static void main(String[] args) throws IOException {
//创建相关流
FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/tupian.jpeg");
FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Documents/tupian.jpeg");
//获取各个流对应的FileChannel
FileChannel source = fileInputStream.getChannel();
FileChannel dest = fileOutputStream.getChannel();
//使用 transferForm 完成拷贝
dest.transferFrom(source, 0, source.size());
//关闭相关的通道和流
source.close();
dest.close();
fileInputStream.close();
fileOutputStream.close();
}
2.4.6 通道(ServerSocketChannel)demo
ServerSocketChannel:主要用于在服务器监听新的客户端Socket连接
public static ServerSocketChannel open()
- 得到一个 ServerSocketChannel 通道
- public final ServerSocketChannel bind(SocketAddress local)
- 设置服务器监听端口
- public final SelectableChannel configureBlocking(boolean block)
- 用于设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
- 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
- public abstract SocketChannel accept()
- 接受一个连接,返回代表这个连接的通道对象
- public final SelectionKey register(Selector sel, int ops)
- 将Channel注册到选择器并设置监听事件,也可以在绑定的同时注册多个事件,如下所示:
- channel.register(selector,Selectionkey.OP_READ | Selectionkey.OP_CONNECT)
SocketChannel:网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
- public static SocketChannel open()
- 得到一个SocketChannel通道
- public final SelectableChannel configureBlocking(boolean block)
- 设置阻塞或非阻塞模式,取值 false表示采用非阻塞模式
- 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
- public abstract boolean connect(SocketAddress remote)
- 连接服务器
- public boolean finishConnect()
- 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
- public int write(ByteBuffer src)
- 往通道里写数据
- 这里写入的是buffer里面position到limit这个之间的数据
- public int read(ByteBuffer dst)
- 从通道里读数据
- public final SelectionKey register(Selector sel, int ops, Object att)
- 注册Channel到选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close()
关闭通道
/**
* Scattering:将数据写入到buffer时,可以采用buffer数组,初次写入 【分散】
* Gathering:从buffer读取数据时,也可以采用buffer数组,依次读
*/
public static void main(String[] args) throws IOException {
//使用 ServerSocketChannel 和 SocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
//绑定端口到socket,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
//创建一个Buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等待客户端的连接(Telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
int msgLength = 8; //假定从客户端接受8个字节
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < msgLength) {
long l = socketChannel.read(byteBuffers);
byteRead += l; //累计读取的字节数
System.out.println("byteRead= " + byteRead);
//使用流打印,看看当前这个buffer的position和limit
Arrays.stream(byteBuffers)
.map(buffer -> "position=" + buffer.position() + ", limit = " + buffer.limit())
.forEach(System.out::println);
}
//读书数据后需要将所有的buffer进行flip
Arrays.asList(byteBuffers).forEach(Buffer::flip);
//将数据读出显示到客户端
long byteWrite = 0;
while (byteWrite < msgLength) {
long l = socketChannel.write(byteBuffers);
byteWrite += l;
}
//将所有的 buffer 进行clear操作
Arrays.asList(byteBuffers).forEach(Buffer::clear);
System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite
+ ", msgLength=" + msgLength);
}
}
2.4.7 Selector(选择器)
NIO是非阻塞的IO方式。可以用一个线程处理多个客户端连接,就会使用到Selector(选择器)
- Selector检测多个“已注册的chanel”的事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在真正有读写事件发生时,才会进行读写,这就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
- Netty的IO线程NioEventLoop 聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
- 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 一个 I/O 线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
2.4.8 SelectionKey
Selector通过管理SelectionKey的集合从而去监听各个Channel。当Channel注册到Selector上面时,会携带该Channel关注的事件(SelectionKey包含Channel以及与之对应的事件),并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。SelectionKey表示的是Selector和网络通道的注册关系,所以FileChannel是没有办法通过SelectionKey注册到Selector上去的。
2.4.9 (ServerSocketChannel) Demo
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
判断该Channel的事件类型,对不同事件进行不同的业务处理 ```java public static void Server() throws IOException { //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selector对象 Selector selector = Selector.open(); //绑定一个端口6666 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接 SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环等待客户端连接 while (true) {
//等待1秒,如果没有事件发生,就返回
if (selector.select(1000) == 0) {
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的 > 0,表示已经获取到关注的事件
// 就获取到相关的 selectionKey 集合,反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历 Set<SelectionKey>,使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据 key 对应的通道发生的事件,做相应的处理
if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接
//该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());
//将SocketChannel设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
//通过key,反向获取到对应的Channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该channel关联的Buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("from 客户端:" + new String(buffer.array()));
}
//手动从集合中移除当前的 selectionKey,防止重复操作
keyIterator.remove();
}
} }
public static void Client() throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的IP和端口 InetSocketAddress socketAddress = new InetSocketAddress(“127.0.0.1”, 6666); //连接服务器 if (!socketChannel.connect(socketAddress)) { //如果不成功 while (!socketChannel.finishConnect()) { System.out.println(“因为连接需要时间,客户端不会阻塞,可以做其他工作。。。”); } } //如果连接成功,就发送数据 String str = “hello, world”; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); //发送数据,实际上就是将buffer数据写入到channel socketChannel.write(byteBuffer); System.in.read(); }
<a name="Cx8qN"></a>
## 2.5 NIO 群聊系统demo
<a name="mqnCO"></a>
### 2.5.1 服务端
老套路:
1. 初始化服务器
1. 新建选择器 **selector**
1. **新建服务端通道 **ServerSocketChannel,并设置非阻塞
1. 通道注册到selector
2. 监听客户端事件
1. 轮询一直去获取selector中发生的事件
1. 如果发生了事件,那么遍历SelectionKey ,获取各个通道的事件
1. 判断事件是连接事件,则处理连接,将客户端通道注册到选择器中,并设置非阻塞
1. 判断事件是读事件,则处理读数据,首先获取通道,建立Buffer块,将数据读取到块中,如果读到数据,将数据转发给其他所有的通道
1. 遍历**selector**.keys() 获取所有的通道,然后遍历发送,注意,排除自身的通道。
```java
/********************服务端********************/
public class GroupChatServer {
//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6666;
//构造器
//初始化工作
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将listenChannel注册到selector,绑定监听事件
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
//监听
public void listen() {
try {
//循环处理
while (true) {
int count = selector.select();
if (count > 0) { //有事件处理
//遍历得到SelectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
//取出SelectionKey
SelectionKey key = iterator.next();
//监听到accept,连接事件
if (key.isAcceptable()) {
SocketChannel socketChannel = listenChannel.accept();
//将该channel设置非阻塞并注册到selector
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
//提示
System.out.println(socketChannel.getRemoteAddress() + " 上线...");
}
if (key.isReadable()) { //通道可以读取数据,即server端收到客户端的消息,
//处理读(专门写方法)
readData(key);
}
iterator.remove();
}
} else {
System.out.println("等待。。。");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//读取客户端消息
private void readData(SelectionKey key) {
//定义一个SocketChannel
SocketChannel channel = null;
try {
//取到关联的channel
channel = (SocketChannel) key.channel();
//创建缓冲buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据count值判断是否读取到数据
if (count > 0) {
//把缓冲区的数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("from 客户端:" + msg);
//向其他的客户端转发消息(去掉自己),专门写一个方法处理
sendInfoToOtherClients(msg, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "离线了...");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
//转发消息给其他客户端(通道)
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
System.out.println("服务器转发消息中。。。");
//遍历 所有注册到selector上的SocketChannel,并排除self
for (SelectionKey key : selector.keys()) {
//通过key取出对应的SocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self) {
//转型
SocketChannel dest = (SocketChannel) targetChannel;
//将msg,存储到buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//将buffer的数据写入通道
dest.write(buffer);
}
}
}
public static void main(String[] args) {
//创建服务器对象
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
2.5.2 客户端
老套路:
- 注册服务器
- 新增选择器
- 创建连接服务器,并设置非阻塞
- 通道注册到选择器上
- 发送消息
- 通道直接写Buffer数据即可
死循环读信息
- 判断是否可用的通道
- 遍历selectedKeys 获取各个通道(客户端也是多通道)的事件
判断事件,如果是读事件,那么获取数据,并打印 ```java /*客户端**/ public class GroupChatClient { //定义相关的属性 private static final String HOST = “127.0.0.1”; //服务器的IP地址 private static final int PORT = 6666; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username;
//构造器,初始化操作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + “ is ok…”); }
//向服务器发送消息 public void sendInfo(String info) { info = username + “ 说:” + info; try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
} }
//读取从服务器端回复的消息 public void readInfo() { try {
int readChannels = selector.select();
if (readChannels > 0) {//有可用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
//得到相关的通道
SocketChannel sc = (SocketChannel) key.channel();
//得到一个buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
//读取
sc.read(buf);
//把缓冲区的数据转成字符串
String msg = new String(buf.array());
System.out.println(msg.trim());
}
}
} else {
System.out.println("没有可以用的通道...");
}
} catch (Exception e) { } }
public static void main(String[] args) throws IOException { //启动客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程用于读取服务器的消息 new Thread(() -> {
while (true) {
chatClient.readInfo();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start(); //主线程用于发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
} } }
<a name="tEmDD"></a>
### 2.5.3 总结
使用int read = channel.read(buffer)读取数据时,读取的结果情况:
1. 当read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:==远程主机强迫关闭一个现有的连接==的IO异常
1. 当read=0时:
1. 某一时刻SocketChannel中当前没有数据可读。
1. 客户端的数据发送完毕。
<a name="5rKQN"></a>
## 2.6 NIO的零拷贝
零拷贝是网络编程的关键,很多性能优化都离不开它。零拷贝是指:从操作系统的角度来看,文件的传输不存在CPU的拷贝,只存在DMA拷贝。在Java程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。零拷贝不仅仅带来更少的数据复制,还能减少线程的上下文切换,减少CPU缓存伪共享以及无CPU校验和计算。
<a name="SvXSj"></a>
### 2.6.1 传统IO拷贝
```java
File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
byte[] arr = new byte[(int) file.length()];
raf.read(arr);
Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);
两次CPU拷贝
DMA: direct memory access 直接内存拷贝(不使用CPU)
2.6.2 mmap优化的IO
直接内存拷贝,需要一次CPU拷贝
2.6.3 sendFile 优化的IO
2.6.4 linux2.4 零拷贝
2.6.5 零拷贝demo
transferTo
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String filename = "protoc-3.6.1-win32.zip";
//得到一个文件channel
FileChannel fileChannel = new FileInputStream(filename).getChannel();
//准备发送
long startTime = System.currentTimeMillis();
//在linux下一个transferTo 方法就可以完成传输
//在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
//transferTo 底层使用到零拷贝
long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
//关闭
fileChannel.close();
}
2.6.6 总结
- 零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有kernel buffer 有一份数据)。
零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的CPU 缓存伪共享以及无 CPU 校验和计算。
三 Netty高性能架构设计
3.1 原生IO的问题
NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
- 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。
3.2 netty的优势
Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.
- 安全:完整的 SSL/TLS 和 StartTLS 支持
高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
3.3 I/O线程模型
目前存在的线程模型主要有:
- 传统阻塞I/O服务模型
- Reactor模式
- 根据Reactor的数量和处理资源池线程的数量不同,有如下3种典型的实现
- 单Reactor单线程
- 单Reactor多线程
- 主从Reactor多线程
Netty线程模型主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。
3.3.1 传统阻塞I/O服务模型
模型特点:
- 采用阻塞IO模式获取输入的数据
- 每个链接都需要独立的线程完成数据的输入,业务处理、数据返回。
- 问题分析:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。
图解说明:黄色的框表示对象,蓝色的框表示线程、白色的框表示方法(API)。之后的图相同。
3.3.2 传统阻塞I/O服务模型 demo
由于模型的逻辑主要集中在服务端,所以所有模型代码示例基本上都是服务端的示例
public static void main(String[] args) throws IOException {
//1、创建一个线程池
//2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
ExecutorService executorService = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
//监听,等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//创建一个线程,与之通讯
executorService.execute(() -> {
//重写Runnable方法,与客户端进行通讯
handler(socket);
});
}
}
//编写一个Handler方法,和客户端通讯。主要进行数据的读取和业务处理。
public static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true){
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.3.3 Reactor模型
针对传统阻塞I/O服务模型的2个缺点,解决方案如下:
- 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
I/O复用结合线程池,就是Reactor模式基本设计思想,如图所示:
- Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将它们同步分派到响应的处理线程,因此Reactor模式也叫Dispatcher模式。
Reactor模式使用IO复用监听事件,收到事件后,分发的某个线程(进程),这点就是网络服务高并发处理的关键。
3.3.4 Reactor核心组成部分
Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件作出反应。
- 我的理解是将Reactor理解成一个Selector,它可以对建立新的连接,也可以将产生的读写事件交换给Handler进行处理
Handlers:处理程序执行I/O事件要完成的实际事件, Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
3.3.5 单Reactor单线程模式
方案说明:Select是前面 I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor对象通过 Select 监控客户端请求事件,收到事件后通过Dispatch 进行分发
- 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
- 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
- Handler会完成Read→业务处理→Send 的完整业务流程
结合实例:
服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的 NIO 案例就属于这种模型。
模型分析
- 优点:
- 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点:
- 性能问题,只有一个线程,无法完全发挥多核
CPU
的性能。Handler
在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈 - 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
- 性能问题,只有一个线程,无法完全发挥多核
使用场景:
客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况
3.3.6 单Reactor单线程demo
```java public class SReactorSThread { private Selector selector; private ServerSocketChannel serverSocketChannel; private int PORT = 6666;
public SReactorSThread() { try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
} }
//对客户端进行监听 public void listen() { try {
while (true) {
int count = selector.select();
//表示有客户端产生事件
if (count > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();//取出产生事件的Channel
Iterator<SelectionKey> iterator = selectionKeys.iterator();//准备对其进行遍历
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//将key交给dispatch去处理
dispatch(key);
iterator.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} }
//dispatch private void dispatch(SelectionKey key) { if (key.isAcceptable()){
accept(key);
}else {
handler(key);
} }
//建立新的连接 private void accept(SelectionKey key) { try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
} }
//对请求进行处理,接收消息—-业务处理—-返回消息 private void handler(SelectionKey key) { SocketChannel channel = null; try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(3);
StringBuilder msg = new StringBuilder();
while (channel.read(buffer) > 0) {
msg.append(new String(buffer.array()));
buffer.clear();
}
System.out.println("接收到消息:" + msg.toString());
//发送消息
String ok = "OK";
buffer.put(ok.getBytes());
//这个flip非常重要哦,是将position置0,limit置于position的位置,以便下面代码进行写入操作能够正确写入buffer中的所有数据
buffer.flip();
channel.write(buffer);
buffer.clear();
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "离线了");
//取消该通道的注册并关闭通道,这里非常重要,没有这一步的话当客户端断开连接就会不断抛出IOException
//是因为,select会一直产生该事件。
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
} } }
/**调用**/
public static void main(String[] args) { SReactorSThread sReactorSThread = new SReactorSThread(); sReactorSThread.listen(); }
<a name="v4HpT"></a>
### 3.3.7 单Reactor多线程模式
![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601003334463-6bdc8bdd-000e-4560-846e-a87fe2752056.png#height=586&id=yPWLE&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1172&originWidth=1416&originalType=binary&ratio=1&size=1610871&status=done&style=none&width=708)
1. Reactor 对象通过select 监控客户端请求事件, 收到事件后,通过dispatch进行分发
1. 如果建立连接请求, 则由Acceptor 通过accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件
1. 如果不是连接请求,则由reactor分发调用连接对应的handler 来处理
1. handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务
1. worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler
1. handler收到响应后,通过send 将结果返回给client
**模型分析**
1. **优点**:
1. 可以充分的利用多核cpu 的处理能力
2. **缺点**:
1. 多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.
<a name="DwjnY"></a>
### 3.3.8 主从Reactor多线程模式
![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601003821019-c9bee28e-3f43-4ed5-a7ff-83b131849ceb.png#height=594&id=SR9fV&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1188&originWidth=1224&originalType=binary&ratio=1&size=1308895&status=done&style=none&width=612)<br />**方案说明:**
1. Reactor主线程MainReactor 对象就只注册一个用于监听连接请求的ServerSocketChannel,通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件
1. 当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor
1. subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
1. 当有新事件发生时, subreactor 就会调用对应的handler处理
1. handler 通过read 读取数据,分发给后面的worker 线程处理
1. worker 线程池分配独立的worker 线程进行业务处理,并返回结果
1. handler 收到响应的结果后,再通过send 将结果返回给client
1. Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor
1. <br />
**模型分析**
1. **优点**:
1. 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
1. 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据
2. **缺点**:
1. 编程复杂度较高
3. **结合实例**:
1. 这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持
<a name="uWB1b"></a>
### 3.3.9 总结
1. 单 Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服
1. 单 Reactor多线程,1 个前台接待员,多个服务员,接待员只负责接待
1. 主从 Reactor多线程,多个前台接待员,多个服务生
**优点:**
1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的
1. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
1. 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU 资源
1. 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性
<a name="WKz5K"></a>
## 3.4 netty 模型
<a name="yoD4s"></a>
### 3.4.1 netty基础版
Netty主要是基于主从Reactor多线程模式做了一定的改进,其中主从Reactor都有单一的一个变成了多个。下面是简单的改进图。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601008652707-5e11e71c-3830-4c11-b5df-e3a37744bd8b.png#height=379&id=biLrn&margin=%5Bobject%20Object%5D&name=image.png&originHeight=758&originWidth=2162&originalType=binary&ratio=1&size=166402&status=done&style=none&width=1081)
1. 增加了BossGroup来维护多个主Reactor,主Reactor还是只关注连接的Accept;增加了WorkGroup来维护多个从Reactor,从Reactor将接收到的请求交给Handler进行处理。
1. 在主Reactor中接收到Accept事件,获取到对应的SocketChannel,Netty会将它进一步封装成NIOSocketChannel对象,这个封装后的对象还包含了该Channel对应的SelectionKey、通信地址等详细信息
1. Netty会将装个封装后的Channel对象注册到WorkerGroup中的从Reactor中。
1. 当WorkerGroup中的从Reactor监听到事件后,就会将之交给与此Reactor对应的Handler进行处理
<a name="kY8Zn"></a>
### 3.4.2 netty进阶版
Netty主要基于主从Reactors多线程模型(如图)做了一定的改进,其中主从 Reactor多线程模型有多个Reactor<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601008887502-43d77c16-4163-419d-9884-232bd5851daa.png#height=425&id=gqyyD&margin=%5Bobject%20Object%5D&name=image.png&originHeight=850&originWidth=1528&originalType=binary&ratio=1&size=1253248&status=done&style=none&width=764)
<a name="jwx3r"></a>
### 3.4.3 netty最终版
![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601009009399-d4c993d9-cfab-401b-9699-d1a2573eb350.png#height=571&id=kvblf&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1142&originWidth=1442&originalType=binary&ratio=1&size=1242826&status=done&style=none&width=721)
1. Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
1. BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。
1. NioEventLoopGroup相当于一个线程管理器(类似于ExecutorServevice),它下面维护很多个NioEventLoop线程。
1. 在初始化这两个Group线程组时,默认会在每个Group中生成CPU*2个NioEventLoop线程
1. 当n个连接来了,Group默认会按照连接请求的顺序分别将这些连接分给各个NioEventLoop去处理。
1. 同时Group还负责管理EventLoop的生命周期。
4. NioEventLoop表示一个不断循环的执行处理任务的线程
1. 它维护了一个线程和任务队列。
1. 每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。
1. 每个NioEventLoop相当于Selector,负责处理多个Channel上的事件
1. 每增加一个请求连接,NioEventLoopGroup就将这个请求依次分发给它下面的NioEventLoop处理。
5. 每个Boss NioEventLoop循环执行的步骤有3步:
1. 轮询accept事件
1. 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上。
1. 处理任务队列到任务,即runAllTasks
6. 每个Worker NioEventLoop循环执行的步骤:
1. 轮询read,write事件
1. 处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理
1. 处理任务队列的任务,即runAllTasks
7. 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中维护了一个ChannelHandlerContext链表,而ChannelHandlerContext则保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。如图所示,Channel和pipeline一一对应,ChannelHandler和ChannelHandlerContext一一对应。
<a name="JIYaQ"></a>
### 3.4.4 netty demo
```java
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup 和 WorkerGroup
//1、创建两个线程组,bossGroup 和 workerGroup
//2、bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
//3、两个都是无限循环
//4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)个数为实际 cpu 核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup worderGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置,配置
bootstrap.group(bossGroup, worderGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //为accept channel的pipeline预添加的handler
//给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); //给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println("........服务器 is ready...");
//绑定一个端口并且同步,生成了一个ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture future = bootstrap.bind(6668).sync();
//对关闭通道进行监听
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
worderGroup.shutdownGracefully();
}
}
}
服务端Handler处理
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
*读取客户端发送过来的消息
* @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
* @param msg 就是客户端发送的数据,默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程:" + Thread.currentThread().getName());
System.out.println("server ctx = " + ctx);
//看看Channel和Pipeline的关系
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是个双向链表,出栈入栈
//将msg转成一个ByteBuf,比NIO的ByteBuffer性能更高
ByteBuf buf = (ByteBuf)msg;
System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//它是 write + flush,将数据写入到缓存buffer,并将buffer中的数据flush进通道
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
}
//处理异常,一般是关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意:客户端使用的不是 ServerBootStrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现类(使用反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 OK...");
//启动客户端去连接服务器端
//关于 channelFuture 涉及到 netty 的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端Handler处理:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪就会触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client: " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时,会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.4.5 总结
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个NioEventLoop 都有一个 selector,用于监听绑定在其上的socket网络通道。
- NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个NioEventLoop
- 每个 NioEventLoop 中包含有一个Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector上可以注册监听多个NioChannel
- 每个 NioChannel 只会绑定在唯一的NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的ChannelPipeline
3.5 任务队列
任务队列由NioEventLoop维护并不断执行。当我们就收到请求之后,在当前channel对应的pipeline中的各个Handler里面进行业务处理和请求过滤。当某些业务需要耗费大量时间的时候,我们可以将任务提交到由NioEventLoop维护的taskQueue或scheduleTaskQueue中,让当前的NioEventLoop线程在空闲时间去执行这些任务。下面将介绍提交任务的3种方式3.5.1 用户程序自定义的普通任务
该方式会将任务提交到taskQueue队列中。提交到该队列中的任务会按照提交顺序依次执行。 ```java channelHandlerContext.channel().eventLoop().execute(new Runnable(){ @Override public void run() { //… } });
<a name="mX8Mp"></a>
### 3.5.2 **用户自定义的定时任务:**
该方式会将任务提交到scheduleTaskQueue定时任务队列中。该队列是底层是优先队列PriorityQueue实现的,故该队列中的任务会按照时间的先后顺序定时执行。
```java
channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);
3.5.3 为其他EventLoop线程对应的Channel添加任务
可以在ChannelInitializer中,将刚创建的各个Channel以及对应的标识加入到统一的集合中去,然后可以根据标识获取Channel以及其对应的NioEventLoop,然后就课程调用execute()或者schedule()方法。
3.6 异步模型
基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
- 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
- Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)
Future说明
- 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.
- ChannelFuture 是一个接口 : public interface ChannelFuture extends Future
。我们可以添加监听器,当监听的事件发生时,就会通知到监听器
工作原理
- 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
- Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
3.6.1 Future-Listener机制
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用方法如下:
方法名称 | 方法作用 |
---|---|
isDone() | 判断当前操作是否完成 |
isSuccess() | 判断已完成的当前操作是否成功 |
getCause() | 获取已完成当前操作失败的原因 |
isCancelled() | 判断已完成的当前操作是否被取消 |
addListener() | 注册监听器,当前操作(Future)已完成,将会通知指定的监听器 |
3.6.2 Future-Listener demo
绑定端口操作时异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println("端口["+ port + "]绑定失败!");
}
});
3.7 http demo
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
定义ChannelInitializer
用于给Channel对应的pipeline添加handler。该ChannelInitializer中的代码在SocketChannel被创建时都会执行
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一个 netty 提供的 httpServerCodec:codec => [coder - decoder]
//1、HttpServerCodec 是 netty 提供的处理http的编解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//2、增加自定义的Handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
自定义Handler
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据。
*
* @param channelHandlerContext
* @param httpObject 客户端和服务器端互相通讯的数据被封装成 HttpObject
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
//判断 msg 是不是 HTTPRequest 请求
if (httpObject instanceof HttpRequest) {
System.out.println("msg 类型 = " + httpObject.getClass());
System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());
//获取
HttpRequest request = (HttpRequest) httpObject;
//获取uri,进行路径过滤
URI uri = new URI(request.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了 favicon.ico,不做响应");
}
//回复信息给浏览器[http协议]
ByteBuf content = Unpooled.copiedBuffer("HelloWorld", CharsetUtil.UTF_8);
//构造一个http的响应,即HTTPResponse
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//将构建好的 response 返回
channelHandlerContext.writeAndFlush(response);
}
}
}