一 netty 概述
1.1 什么是netty
- Netty是由JBOSS提供的一个Java开源框架,现为Github上的独立项目
- Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
1.2 Netty的应用场景
互联网行业
阿里分布式服务框架Dubbo的RPC框架使用Dubbo协议进行节点间通信,Dubbo协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间的内部通信
游戏行业
- Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器
- 地图服务器之间可以方便的通过 Netty 进行高性能的通信
大数据领域
- 经典的 Hadoop的高性能通信和序列化组件Avro的 RPC 框架,默认采用 Netty 进行跨界点通信
它的 Netty Service 基于 Netty 框架二次封装实现。
二 Java IO模型介绍
2.1 I/O模型简单说明
I/O模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。Java共支持3种网络编程模型IO模式:BIO、NIO、AIO。
Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

- Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

Java AIO(NIO.2) : 异步非阻塞,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
2.2 I/O模型使用场景
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择。
- NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
2.3 BIO 原理
同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。可以通过线程池机制改善(实现多个客户端连接服务器),但是不能解决性能问题。
2.3.1 BIO 工作机制

服务器端启动一个ServerSocket
- 客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯
- 客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
-
2.3.2 BIO demo
使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,就启动一个线程与之通讯。
- 要求使用线程池机制改善,可以连接多个客户端.
服务器端可以接收客户端发送的数据(通过cmd的telnet方式即可)。 ```java public class BIOServer { public static void main(String[] args) throws IOException {
//1、创建一个线程池//2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)ExecutorService executorService = Executors.newCachedThreadPool();//创建ServerSocketServerSocket serverSocket = new ServerSocket(6666);System.out.println("服务器启动了");while (true) {System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());//监听,等待客户端连接System.out.println("等待连接");final Socket socket = serverSocket.accept();System.out.println("连接到一个客户端");//创建一个线程,与之通讯executorService.execute(() -> {//重写Runnable方法,与客户端进行通讯handler(socket);});}
}
//编写一个Handler方法,和客户端通讯 public static void handler(Socket socket) {
try {System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());byte[] bytes = new byte[1024];//通过socket获取输入流InputStream inputStream = socket.getInputStream();//循环的读取客户端发送的数据while (true){System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());System.out.println("read....");int read = inputStream.read(bytes);if (read != -1){System.out.println(new String(bytes, 0, read));//输出客户端发送的数据} else {break;}}} catch (IOException e) {e.printStackTrace();} finally {System.out.println("关闭和client的连接");try {socket.close();} catch (IOException e) {e.printStackTrace();}}
} }
**问题分析:**1. 服务端主线程会阻塞在 serverSocket.accept() 这个方法处,当有新的客户端发起请求时,主线程通过线程池调用新线程与其通信,每个通信线程会阻塞在socket.getInputStream() 这个方法处。1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。1. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。1. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费<a name="dB6qB"></a>## 2.4 NIO 原理1. Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)1. NIO是 面向缓冲区 ,或者面向块编程的。数据读取后放到缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络1. Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直到数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某个通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。1. 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。PS: HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。<a name="cQn8U"></a>### 2.4.1 NIO 工作机制1. 每个channel 都对应一个Buffer1. Selector 对应一个线程,一个线程对应多个channel(连接)1. 假设三个channel 注册到selector1. Event(事件)决定程序切换到哪个channel1. Selector根据不同的事件,在各个通道上切换1. Buffer是一块内存,底层是一个数组1. 数据的读取写入是通过Buffer, 这个和BIO 一样, 而BIO中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer 是可以读也可以写, 需要flip方法切换。<a name="LT38A"></a>### 2.4.2 Buffer 介绍缓冲区本质上是一个可读可写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer。<br /><br />**主要种类**1. ByteBuffer,存储字节数据到缓冲区1. ShortBuffer,存储字符串数据到缓冲区1. CharBuffer,存储字符数据到缓冲区1. IntBuffer,存储整数数据到缓冲区1. LongBuffer,存储长整型数据到缓冲区1. DoubleBuffer,存储小数到缓冲区1. FloatBuffer,存储小数到缓冲区**主要属性**| **属性** | **描述** || --- | --- || Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 || Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 || Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备 || Mark | 标记 ,一般不会主动修改,在`flip()`<br />被调用后,mark就作废了。 |**类关系图**<br /><br />MappedByteBuffer 可以直接操作系统内存<br />**主要方法**1. public final int capacity();1. 直接返回了此缓冲区的容量,capacity2. public final int position();1. 直接返回了此缓冲区指针的当前位置3. public final Buffer position(int newPosition);1. 设置此缓冲区的位置,设置position4. public final int limit();1. 返回此缓冲区的限制5. public final Buffer limit(int newLimit);1. 设置此缓冲区的限制,设置limit6. public final Buffer clear();1. 清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。7. public final Buffer flip();1. 反转此缓冲区, limit = position;position = 0;mark = -1。1. 当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。1. flip()方法将limit置向position, position置0,那么从position读取数据到limit即为此缓冲区中所有的数据。8. public final boolean hasRemaining();1. 告知当前位置和限制之间是否有元素。return position < limit;9. public abstract boolean isReadOnly();1. 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。10. public abstract boolean hasArray();1. 告知此缓冲区是否具有可访问的底层实现数组11. public abstract Object array();1. 返回此缓冲区的底层实现数组**ByteBuffer 方法**1. public static ByteBuffer allocateDirect(int capacity);1. 创建直接缓冲区2. public static ByteBuffer allocate(int capacity) ;1. 设置缓冲区的初识容量3. public abstract byte get();1. 从当前位置position上get数据,获取之后,position会自动加14. public abstract byte get(int index);1. 通过绝对位置获取数据。5. public abstract ByteBuffer put(byte b);1. 从当前位置上添加,put之后,position会自动加16. public abstract ByteBuffer put(int index, byte b);1. 从绝对位置上添加数据7. public abstract ByteBuffer putXxx(Xxx value [, int index]);1. 从position当前位置插入元素。Xxx表示基本数据类型1. 此方法时类型化的 put 和 get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。<a name="dWYMv"></a>### 2.4.3 Buffer demo```java//创建一个Buffer,大小为5,即可以存放5个intIntBuffer intBuffer = IntBuffer.allocate(5);//向buffer中存放数据for (int i = 0; i < intBuffer.capacity(); i++) {intBuffer.put(i * 2);}//如何从buffer中读取数据//将buffer转换,读写切换intBuffer.flip();while (intBuffer.hasRemaining()) {System.out.println(intBuffer.get());}
Buffer 执行原理
- Buffer 刚创建时,capacity = 5 ,固定不变。limit指针指向5,position指向0,mark指向-1

- 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同,即limit=position=5

- 调用 buffer.flip()实际上是重置了position和limit两个变量,将limit放在position的位置,即 position=0、limit=5。

调用 intBuffer.get方法,实际上是不断移动position指针,直到它移动到limit的位置
2.4.4 通道(Channel)
通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓存读数据,也可以写数据到缓存

- BIO中的stream是单向的,例如:FileInputStream对象只能进行读取数据的操作,而NIO中的Channel是双向的,可读可写操作。
- Channel在 NIO 中是一个接口:public interface Channel extends Closeable{}
- 常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel(类似ServerSocket)、SocketChannel(类似Socket)
- FileChannel 用于文件数据的读写,DatagramChannel用于UDP数据的读写,ServerSocketChannel和SocketChannel用于TCP数据读写。
类图关系
FileChannel类 常见方法
- public int read(ByteBuffer dst)
- 从通道读取数据并放到缓冲区中
- 此操作也会移动 Buffer 中的position指针,不断往position中放数据,read完成后position指向limit。
- public int write(ByteBuffer src)
- 把缓冲区的数据写到通道中
- 此操作也会不断移动Buffer中的position位置直到limit,读取到的数据就是position到limit这两个指针之间的数据。
- public long transferFrom(ReadableByteChannel src, long position, long count)
- 从目标通道中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target)
将数据写入到本地文件
public static void main(String[] args) throws IOException {String str = "hello,world";//创建一个输出流 -> ChannelFileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file01.txt");//通过 FileOutputStream 获取对应的 FileChannel//这个 FileChannel 真实类型是 FileChannelImplFileChannel fileChannel = fileOutputStream.getChannel();//创建一个缓冲区 ByteBufferByteBuffer byteBuffer = ByteBuffer.allocate(1024);//将str放入ByteBufferbyteBuffer.put(str.getBytes());//对ByteBuffer进行反转,开始读取byteBuffer.flip();//将ByteBuffer数据写入到FileChannel//此操作会不断移动 Buffer中的 position到 limit 的位置fileChannel.write(byteBuffer);fileOutputStream.close();}

有代码得知我们java原生的流里面内嵌了通道。从本地文件读取数据
public static void main(String[] args) throws IOException {//创建文件的输入流File file = new File("/Users/hezhaoming/Downloads/file01.txt");FileInputStream fileInputStream = new FileInputStream(file);//通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImplFileChannel fileChannel = fileInputStream.getChannel();//创建缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());//将通道的数据读入到bufferfileChannel.read(byteBuffer);//将ByteBuffer 的字节数据转成StringSystem.out.println(new String(byteBuffer.array()));fileInputStream.close();}

3. 文件01输出到文件02public static void main(String[] args) throws IOException {//读取FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/file01.txt");FileChannel fileChannel1 = fileInputStream.getChannel();//输出FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file02.txt");FileChannel fileChannel2 = fileOutputStream.getChannel();//缓冲块ByteBuffer byteBuffer = ByteBuffer.allocate(512);while (true) {//清空buffer,由于循环的最后执行了 write 操作,会将 position 移动到 limit 的位置//清空 Buffer的操作才为上一次的循环重置position的位置// 如果没有重置position,那么上次读取后,position和limit位置一样,读取后read的值永远为0byteBuffer.clear();//将数据存入 ByteBuffer,它会基于 Buffer 此刻的 position 和 limit 的值,// 将数据放入position的位置,然后不断移动position直到其与limit相等;int read = fileChannel1.read(byteBuffer);if (read == -1) { //表示读完break;}//将buffer中的数据写入到 FileChannel02 ---- file02.txtbyteBuffer.flip();fileChannel2.write(byteBuffer);}//关闭相关的流fileInputStream.close();fileOutputStream.close();}

拷贝图片
public static void main(String[] args) throws IOException {//创建相关流FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/tupian.jpeg");FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Documents/tupian.jpeg");//获取各个流对应的FileChannelFileChannel source = fileInputStream.getChannel();FileChannel dest = fileOutputStream.getChannel();//使用 transferForm 完成拷贝dest.transferFrom(source, 0, source.size());//关闭相关的通道和流source.close();dest.close();fileInputStream.close();fileOutputStream.close();}
2.4.6 通道(ServerSocketChannel)demo
ServerSocketChannel:主要用于在服务器监听新的客户端Socket连接
public static ServerSocketChannel open()
- 得到一个 ServerSocketChannel 通道
- public final ServerSocketChannel bind(SocketAddress local)
- 设置服务器监听端口
- public final SelectableChannel configureBlocking(boolean block)
- 用于设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
- 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
- public abstract SocketChannel accept()
- 接受一个连接,返回代表这个连接的通道对象
- public final SelectionKey register(Selector sel, int ops)
- 将Channel注册到选择器并设置监听事件,也可以在绑定的同时注册多个事件,如下所示:
- channel.register(selector,Selectionkey.OP_READ | Selectionkey.OP_CONNECT)
SocketChannel:网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
- public static SocketChannel open()
- 得到一个SocketChannel通道
- public final SelectableChannel configureBlocking(boolean block)
- 设置阻塞或非阻塞模式,取值 false表示采用非阻塞模式
- 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
- public abstract boolean connect(SocketAddress remote)
- 连接服务器
- public boolean finishConnect()
- 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
- public int write(ByteBuffer src)
- 往通道里写数据
- 这里写入的是buffer里面position到limit这个之间的数据
- public int read(ByteBuffer dst)
- 从通道里读数据
- public final SelectionKey register(Selector sel, int ops, Object att)
- 注册Channel到选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close()
关闭通道
/*** Scattering:将数据写入到buffer时,可以采用buffer数组,初次写入 【分散】* Gathering:从buffer读取数据时,也可以采用buffer数组,依次读*/public static void main(String[] args) throws IOException {//使用 ServerSocketChannel 和 SocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);//绑定端口到socket,并启动serverSocketChannel.socket().bind(inetSocketAddress);//创建一个Buffer数组ByteBuffer[] byteBuffers = new ByteBuffer[2];byteBuffers[0] = ByteBuffer.allocate(5);byteBuffers[1] = ByteBuffer.allocate(3);//等待客户端的连接(Telnet)SocketChannel socketChannel = serverSocketChannel.accept();int msgLength = 8; //假定从客户端接受8个字节//循环的读取while (true) {int byteRead = 0;while (byteRead < msgLength) {long l = socketChannel.read(byteBuffers);byteRead += l; //累计读取的字节数System.out.println("byteRead= " + byteRead);//使用流打印,看看当前这个buffer的position和limitArrays.stream(byteBuffers).map(buffer -> "position=" + buffer.position() + ", limit = " + buffer.limit()).forEach(System.out::println);}//读书数据后需要将所有的buffer进行flipArrays.asList(byteBuffers).forEach(Buffer::flip);//将数据读出显示到客户端long byteWrite = 0;while (byteWrite < msgLength) {long l = socketChannel.write(byteBuffers);byteWrite += l;}//将所有的 buffer 进行clear操作Arrays.asList(byteBuffers).forEach(Buffer::clear);System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite+ ", msgLength=" + msgLength);}}
2.4.7 Selector(选择器)
NIO是非阻塞的IO方式。可以用一个线程处理多个客户端连接,就会使用到Selector(选择器)
- Selector检测多个“已注册的chanel”的事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在真正有读写事件发生时,才会进行读写,这就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销

- Netty的IO线程NioEventLoop 聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
- 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 一个 I/O 线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
2.4.8 SelectionKey
Selector通过管理SelectionKey的集合从而去监听各个Channel。当Channel注册到Selector上面时,会携带该Channel关注的事件(SelectionKey包含Channel以及与之对应的事件),并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。SelectionKey表示的是Selector和网络通道的注册关系,所以FileChannel是没有办法通过SelectionKey注册到Selector上去的。
2.4.9 (ServerSocketChannel) Demo
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
判断该Channel的事件类型,对不同事件进行不同的业务处理 ```java public static void Server() throws IOException { //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selector对象 Selector selector = Selector.open(); //绑定一个端口6666 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接 SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环等待客户端连接 while (true) {
//等待1秒,如果没有事件发生,就返回if (selector.select(1000) == 0) {System.out.println("服务器等待了1秒,无连接");continue;}//如果返回的 > 0,表示已经获取到关注的事件// 就获取到相关的 selectionKey 集合,反向获取通道Set<SelectionKey> selectionKeys = selector.selectedKeys();//遍历 Set<SelectionKey>,使用迭代器遍历Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {//获取到SelectionKeySelectionKey key = keyIterator.next();//根据 key 对应的通道发生的事件,做相应的处理if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接//该客户端生成一个 SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());//将SocketChannel设置为非阻塞socketChannel.configureBlocking(false);//将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个BuffersocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));}if (key.isReadable()) {//通过key,反向获取到对应的ChannelSocketChannel channel = (SocketChannel) key.channel();//获取到该channel关联的BufferByteBuffer buffer = (ByteBuffer) key.attachment();channel.read(buffer);System.out.println("from 客户端:" + new String(buffer.array()));}//手动从集合中移除当前的 selectionKey,防止重复操作keyIterator.remove();}
} }
public static void Client() throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的IP和端口 InetSocketAddress socketAddress = new InetSocketAddress(“127.0.0.1”, 6666); //连接服务器 if (!socketChannel.connect(socketAddress)) { //如果不成功 while (!socketChannel.finishConnect()) { System.out.println(“因为连接需要时间,客户端不会阻塞,可以做其他工作。。。”); } } //如果连接成功,就发送数据 String str = “hello, world”; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); //发送数据,实际上就是将buffer数据写入到channel socketChannel.write(byteBuffer); System.in.read(); }
<a name="Cx8qN"></a>## 2.5 NIO 群聊系统demo<a name="mqnCO"></a>### 2.5.1 服务端老套路:1. 初始化服务器1. 新建选择器 **selector**1. **新建服务端通道 **ServerSocketChannel,并设置非阻塞1. 通道注册到selector2. 监听客户端事件1. 轮询一直去获取selector中发生的事件1. 如果发生了事件,那么遍历SelectionKey ,获取各个通道的事件1. 判断事件是连接事件,则处理连接,将客户端通道注册到选择器中,并设置非阻塞1. 判断事件是读事件,则处理读数据,首先获取通道,建立Buffer块,将数据读取到块中,如果读到数据,将数据转发给其他所有的通道1. 遍历**selector**.keys() 获取所有的通道,然后遍历发送,注意,排除自身的通道。```java/********************服务端********************/public class GroupChatServer {//定义属性private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6666;//构造器//初始化工作public GroupChatServer() {try {//得到选择器selector = Selector.open();listenChannel = ServerSocketChannel.open();//绑定端口listenChannel.socket().bind(new InetSocketAddress(PORT));//设置非阻塞模式listenChannel.configureBlocking(false);//将listenChannel注册到selector,绑定监听事件listenChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}//监听public void listen() {try {//循环处理while (true) {int count = selector.select();if (count > 0) { //有事件处理//遍历得到SelectionKey集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {//取出SelectionKeySelectionKey key = iterator.next();//监听到accept,连接事件if (key.isAcceptable()) {SocketChannel socketChannel = listenChannel.accept();//将该channel设置非阻塞并注册到selectorsocketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);//提示System.out.println(socketChannel.getRemoteAddress() + " 上线...");}if (key.isReadable()) { //通道可以读取数据,即server端收到客户端的消息,//处理读(专门写方法)readData(key);}iterator.remove();}} else {System.out.println("等待。。。");}}} catch (Exception e) {e.printStackTrace();}}//读取客户端消息private void readData(SelectionKey key) {//定义一个SocketChannelSocketChannel channel = null;try {//取到关联的channelchannel = (SocketChannel) key.channel();//创建缓冲bufferByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);//根据count值判断是否读取到数据if (count > 0) {//把缓冲区的数据转成字符串String msg = new String(buffer.array());//输出该消息System.out.println("from 客户端:" + msg);//向其他的客户端转发消息(去掉自己),专门写一个方法处理sendInfoToOtherClients(msg, channel);}} catch (IOException e) {try {System.out.println(channel.getRemoteAddress() + "离线了...");//取消注册key.cancel();//关闭通道channel.close();} catch (IOException ex) {ex.printStackTrace();}}}//转发消息给其他客户端(通道)private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {System.out.println("服务器转发消息中。。。");//遍历 所有注册到selector上的SocketChannel,并排除selffor (SelectionKey key : selector.keys()) {//通过key取出对应的SocketChannelChannel targetChannel = key.channel();//排除自己if (targetChannel instanceof SocketChannel && targetChannel != self) {//转型SocketChannel dest = (SocketChannel) targetChannel;//将msg,存储到bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());//将buffer的数据写入通道dest.write(buffer);}}}public static void main(String[] args) {//创建服务器对象GroupChatServer groupChatServer = new GroupChatServer();groupChatServer.listen();}}
2.5.2 客户端
老套路:
- 注册服务器
- 新增选择器
- 创建连接服务器,并设置非阻塞
- 通道注册到选择器上
- 发送消息
- 通道直接写Buffer数据即可
死循环读信息
- 判断是否可用的通道
- 遍历selectedKeys 获取各个通道(客户端也是多通道)的事件
判断事件,如果是读事件,那么获取数据,并打印 ```java /*客户端**/ public class GroupChatClient { //定义相关的属性 private static final String HOST = “127.0.0.1”; //服务器的IP地址 private static final int PORT = 6666; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username;
//构造器,初始化操作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + “ is ok…”); }
//向服务器发送消息 public void sendInfo(String info) { info = username + “ 说:” + info; try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
} }
//读取从服务器端回复的消息 public void readInfo() { try {
int readChannels = selector.select();if (readChannels > 0) {//有可用的通道Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isReadable()) {//得到相关的通道SocketChannel sc = (SocketChannel) key.channel();//得到一个bufferByteBuffer buf = ByteBuffer.allocate(1024);//读取sc.read(buf);//把缓冲区的数据转成字符串String msg = new String(buf.array());System.out.println(msg.trim());}}} else {System.out.println("没有可以用的通道...");}
} catch (Exception e) { } }
public static void main(String[] args) throws IOException { //启动客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程用于读取服务器的消息 new Thread(() -> {
while (true) {chatClient.readInfo();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}).start(); //主线程用于发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) {
String s = scanner.nextLine();chatClient.sendInfo(s);
} } }
<a name="tEmDD"></a>### 2.5.3 总结使用int read = channel.read(buffer)读取数据时,读取的结果情况:1. 当read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:==远程主机强迫关闭一个现有的连接==的IO异常1. 当read=0时:1. 某一时刻SocketChannel中当前没有数据可读。1. 客户端的数据发送完毕。<a name="5rKQN"></a>## 2.6 NIO的零拷贝零拷贝是网络编程的关键,很多性能优化都离不开它。零拷贝是指:从操作系统的角度来看,文件的传输不存在CPU的拷贝,只存在DMA拷贝。在Java程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。零拷贝不仅仅带来更少的数据复制,还能减少线程的上下文切换,减少CPU缓存伪共享以及无CPU校验和计算。<a name="SvXSj"></a>### 2.6.1 传统IO拷贝```javaFile file = new File("test.txt");RandomAccessFile raf = new RandomAccessFile(file, "rw");byte[] arr = new byte[(int) file.length()];raf.read(arr);Socket socket = new ServerSocket(8080).accept();socket.getOutputStream().write(arr);
两次CPU拷贝
DMA: direct memory access 直接内存拷贝(不使用CPU)
2.6.2 mmap优化的IO
直接内存拷贝,需要一次CPU拷贝
2.6.3 sendFile 优化的IO
2.6.4 linux2.4 零拷贝
2.6.5 零拷贝demo
transferTo
public static void main(String[] args) throws Exception {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("localhost", 7001));String filename = "protoc-3.6.1-win32.zip";//得到一个文件channelFileChannel fileChannel = new FileInputStream(filename).getChannel();//准备发送long startTime = System.currentTimeMillis();//在linux下一个transferTo 方法就可以完成传输//在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要//transferTo 底层使用到零拷贝long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));//关闭fileChannel.close();}
2.6.6 总结
- 零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有kernel buffer 有一份数据)。
零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的CPU 缓存伪共享以及无 CPU 校验和计算。
三 Netty高性能架构设计
3.1 原生IO的问题
NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
- 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。
3.2 netty的优势
Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.
- 安全:完整的 SSL/TLS 和 StartTLS 支持
高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
3.3 I/O线程模型
目前存在的线程模型主要有:
- 传统阻塞I/O服务模型
- Reactor模式
- 根据Reactor的数量和处理资源池线程的数量不同,有如下3种典型的实现
- 单Reactor单线程
- 单Reactor多线程
- 主从Reactor多线程
Netty线程模型主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。
3.3.1 传统阻塞I/O服务模型
模型特点:
- 采用阻塞IO模式获取输入的数据
- 每个链接都需要独立的线程完成数据的输入,业务处理、数据返回。
- 问题分析:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。

图解说明:黄色的框表示对象,蓝色的框表示线程、白色的框表示方法(API)。之后的图相同。
3.3.2 传统阻塞I/O服务模型 demo
由于模型的逻辑主要集中在服务端,所以所有模型代码示例基本上都是服务端的示例
public static void main(String[] args) throws IOException {//1、创建一个线程池//2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)ExecutorService executorService = Executors.newCachedThreadPool();//创建ServerSocketServerSocket serverSocket = new ServerSocket(6666);System.out.println("服务器启动了");while (true) {//监听,等待客户端连接final Socket socket = serverSocket.accept();System.out.println("连接到一个客户端");//创建一个线程,与之通讯executorService.execute(() -> {//重写Runnable方法,与客户端进行通讯handler(socket);});}}//编写一个Handler方法,和客户端通讯。主要进行数据的读取和业务处理。public static void handler(Socket socket) {try {byte[] bytes = new byte[1024];//通过socket获取输入流InputStream inputStream = socket.getInputStream();//循环的读取客户端发送的数据while (true){int read = inputStream.read(bytes);if (read != -1){System.out.println(new String(bytes, 0, read));//输出客户端发送的数据} else {break;}}} catch (IOException e) {e.printStackTrace();} finally {System.out.println("关闭和client的连接");try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
3.3.3 Reactor模型
针对传统阻塞I/O服务模型的2个缺点,解决方案如下:
- 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
I/O复用结合线程池,就是Reactor模式基本设计思想,如图所示:
- Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将它们同步分派到响应的处理线程,因此Reactor模式也叫Dispatcher模式。
Reactor模式使用IO复用监听事件,收到事件后,分发的某个线程(进程),这点就是网络服务高并发处理的关键。
3.3.4 Reactor核心组成部分
Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件作出反应。
- 我的理解是将Reactor理解成一个Selector,它可以对建立新的连接,也可以将产生的读写事件交换给Handler进行处理
Handlers:处理程序执行I/O事件要完成的实际事件, Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
3.3.5 单Reactor单线程模式

方案说明:Select是前面 I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor对象通过 Select 监控客户端请求事件,收到事件后通过Dispatch 进行分发
- 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
- 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
- Handler会完成Read→业务处理→Send 的完整业务流程
结合实例:
服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的 NIO 案例就属于这种模型。
模型分析
- 优点:
- 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点:
- 性能问题,只有一个线程,无法完全发挥多核
CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈 - 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
- 性能问题,只有一个线程,无法完全发挥多核
使用场景:
客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况
3.3.6 单Reactor单线程demo
```java public class SReactorSThread { private Selector selector; private ServerSocketChannel serverSocketChannel; private int PORT = 6666;
public SReactorSThread() { try {
selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
} }
//对客户端进行监听 public void listen() { try {
while (true) {int count = selector.select();//表示有客户端产生事件if (count > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();//取出产生事件的ChannelIterator<SelectionKey> iterator = selectionKeys.iterator();//准备对其进行遍历while (iterator.hasNext()) {SelectionKey key = iterator.next();//将key交给dispatch去处理dispatch(key);iterator.remove();}}}
} catch (Exception e) {
e.printStackTrace();
} }
//dispatch private void dispatch(SelectionKey key) { if (key.isAcceptable()){
accept(key);
}else {
handler(key);
} }
//建立新的连接 private void accept(SelectionKey key) { try {
SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
} }
//对请求进行处理,接收消息—-业务处理—-返回消息 private void handler(SelectionKey key) { SocketChannel channel = null; try {
channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(3);StringBuilder msg = new StringBuilder();while (channel.read(buffer) > 0) {msg.append(new String(buffer.array()));buffer.clear();}System.out.println("接收到消息:" + msg.toString());//发送消息String ok = "OK";buffer.put(ok.getBytes());//这个flip非常重要哦,是将position置0,limit置于position的位置,以便下面代码进行写入操作能够正确写入buffer中的所有数据buffer.flip();channel.write(buffer);buffer.clear();
} catch (IOException e) {
try {System.out.println(channel.getRemoteAddress() + "离线了");//取消该通道的注册并关闭通道,这里非常重要,没有这一步的话当客户端断开连接就会不断抛出IOException//是因为,select会一直产生该事件。key.cancel();channel.close();} catch (IOException ex) {ex.printStackTrace();}
} } }
/**调用**/
public static void main(String[] args) { SReactorSThread sReactorSThread = new SReactorSThread(); sReactorSThread.listen(); }
<a name="v4HpT"></a>### 3.3.7 单Reactor多线程模式1. Reactor 对象通过select 监控客户端请求事件, 收到事件后,通过dispatch进行分发1. 如果建立连接请求, 则由Acceptor 通过accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件1. 如果不是连接请求,则由reactor分发调用连接对应的handler 来处理1. handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务1. worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler1. handler收到响应后,通过send 将结果返回给client**模型分析**1. **优点**:1. 可以充分的利用多核cpu 的处理能力2. **缺点**:1. 多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.<a name="DwjnY"></a>### 3.3.8 主从Reactor多线程模式<br />**方案说明:**1. Reactor主线程MainReactor 对象就只注册一个用于监听连接请求的ServerSocketChannel,通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件1. 当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor1. subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理1. 当有新事件发生时, subreactor 就会调用对应的handler处理1. handler 通过read 读取数据,分发给后面的worker 线程处理1. worker 线程池分配独立的worker 线程进行业务处理,并返回结果1. handler 收到响应的结果后,再通过send 将结果返回给client1. Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor1. <br />**模型分析**1. **优点**:1. 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。1. 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据2. **缺点**:1. 编程复杂度较高3. **结合实例**:1. 这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持<a name="uWB1b"></a>### 3.3.9 总结1. 单 Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服1. 单 Reactor多线程,1 个前台接待员,多个服务员,接待员只负责接待1. 主从 Reactor多线程,多个前台接待员,多个服务生**优点:**1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的1. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销1. 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU 资源1. 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性<a name="WKz5K"></a>## 3.4 netty 模型<a name="yoD4s"></a>### 3.4.1 netty基础版Netty主要是基于主从Reactor多线程模式做了一定的改进,其中主从Reactor都有单一的一个变成了多个。下面是简单的改进图。<br />1. 增加了BossGroup来维护多个主Reactor,主Reactor还是只关注连接的Accept;增加了WorkGroup来维护多个从Reactor,从Reactor将接收到的请求交给Handler进行处理。1. 在主Reactor中接收到Accept事件,获取到对应的SocketChannel,Netty会将它进一步封装成NIOSocketChannel对象,这个封装后的对象还包含了该Channel对应的SelectionKey、通信地址等详细信息1. Netty会将装个封装后的Channel对象注册到WorkerGroup中的从Reactor中。1. 当WorkerGroup中的从Reactor监听到事件后,就会将之交给与此Reactor对应的Handler进行处理<a name="kY8Zn"></a>### 3.4.2 netty进阶版Netty主要基于主从Reactors多线程模型(如图)做了一定的改进,其中主从 Reactor多线程模型有多个Reactor<br /><a name="jwx3r"></a>### 3.4.3 netty最终版1. Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写1. BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。1. NioEventLoopGroup相当于一个线程管理器(类似于ExecutorServevice),它下面维护很多个NioEventLoop线程。1. 在初始化这两个Group线程组时,默认会在每个Group中生成CPU*2个NioEventLoop线程1. 当n个连接来了,Group默认会按照连接请求的顺序分别将这些连接分给各个NioEventLoop去处理。1. 同时Group还负责管理EventLoop的生命周期。4. NioEventLoop表示一个不断循环的执行处理任务的线程1. 它维护了一个线程和任务队列。1. 每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。1. 每个NioEventLoop相当于Selector,负责处理多个Channel上的事件1. 每增加一个请求连接,NioEventLoopGroup就将这个请求依次分发给它下面的NioEventLoop处理。5. 每个Boss NioEventLoop循环执行的步骤有3步:1. 轮询accept事件1. 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上。1. 处理任务队列到任务,即runAllTasks6. 每个Worker NioEventLoop循环执行的步骤:1. 轮询read,write事件1. 处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理1. 处理任务队列的任务,即runAllTasks7. 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中维护了一个ChannelHandlerContext链表,而ChannelHandlerContext则保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。如图所示,Channel和pipeline一一对应,ChannelHandler和ChannelHandlerContext一一对应。<a name="JIYaQ"></a>### 3.4.4 netty demo```javapublic class NettyServer {public static void main(String[] args) throws InterruptedException {//创建BossGroup 和 WorkerGroup//1、创建两个线程组,bossGroup 和 workerGroup//2、bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成//3、两个都是无限循环//4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)个数为实际 cpu 核数 * 2EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup worderGroup = new NioEventLoopGroup();try {//创建服务器端的启动对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();//使用链式编程来进行设置,配置bootstrap.group(bossGroup, worderGroup) //设置两个线程组.channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() { //为accept channel的pipeline预添加的handler//给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyServerHandler());}}); //给我们的 workerGroup 的 EventLoop 对应的管道设置处理器System.out.println("........服务器 is ready...");//绑定一个端口并且同步,生成了一个ChannelFuture 对象//启动服务器(并绑定端口)ChannelFuture future = bootstrap.bind(6668).sync();//对关闭通道进行监听future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();worderGroup.shutdownGracefully();}}}
服务端Handler处理
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/***读取客户端发送过来的消息* @param ctx 上下文对象,含有 管道pipeline,通道channel,地址* @param msg 就是客户端发送的数据,默认Object* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服务器读取线程:" + Thread.currentThread().getName());System.out.println("server ctx = " + ctx);//看看Channel和Pipeline的关系Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline(); //本质是个双向链表,出栈入栈//将msg转成一个ByteBuf,比NIO的ByteBuffer性能更高ByteBuf buf = (ByteBuf)msg;System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + ctx.channel().remoteAddress());}//数据读取完毕@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//它是 write + flush,将数据写入到缓存buffer,并将buffer中的数据flush进通道//一般讲,我们对这个发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));}//处理异常,一般是关闭通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
客户端
public class NettyClient {public static void main(String[] args) throws InterruptedException {//客户端需要一个事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//创建客户端启动对象//注意:客户端使用的不是 ServerBootStrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//设置相关参数bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) //设置客户端通道的实现类(使用反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器}});System.out.println("客户端 OK...");//启动客户端去连接服务器端//关于 channelFuture 涉及到 netty 的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//给关闭通道进行监听channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}}
客户端Handler处理:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 当通道就绪就会触发* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client: " + ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));}/*** 当通道有读取事件时,会触发* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));System.out.println("服务器的地址:" + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
3.4.5 总结
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个NioEventLoop 都有一个 selector,用于监听绑定在其上的socket网络通道。
- NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个NioEventLoop
- 每个 NioEventLoop 中包含有一个Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector上可以注册监听多个NioChannel
- 每个 NioChannel 只会绑定在唯一的NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的ChannelPipeline
3.5 任务队列
任务队列由NioEventLoop维护并不断执行。当我们就收到请求之后,在当前channel对应的pipeline中的各个Handler里面进行业务处理和请求过滤。当某些业务需要耗费大量时间的时候,我们可以将任务提交到由NioEventLoop维护的taskQueue或scheduleTaskQueue中,让当前的NioEventLoop线程在空闲时间去执行这些任务。下面将介绍提交任务的3种方式3.5.1 用户程序自定义的普通任务
该方式会将任务提交到taskQueue队列中。提交到该队列中的任务会按照提交顺序依次执行。 ```java channelHandlerContext.channel().eventLoop().execute(new Runnable(){ @Override public void run() { //… } });
<a name="mX8Mp"></a>### 3.5.2 **用户自定义的定时任务:**该方式会将任务提交到scheduleTaskQueue定时任务队列中。该队列是底层是优先队列PriorityQueue实现的,故该队列中的任务会按照时间的先后顺序定时执行。```javachannelHandlerContext.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {}}, 60, TimeUnit.SECONDS);
3.5.3 为其他EventLoop线程对应的Channel添加任务
可以在ChannelInitializer中,将刚创建的各个Channel以及对应的标识加入到统一的集合中去,然后可以根据标识获取Channel以及其对应的NioEventLoop,然后就课程调用execute()或者schedule()方法。
3.6 异步模型
基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
- 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
- Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)
Future说明
- 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.
- ChannelFuture 是一个接口 : public interface ChannelFuture extends Future
。我们可以添加监听器,当监听的事件发生时,就会通知到监听器
工作原理
- 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
- Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
3.6.1 Future-Listener机制
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用方法如下:
| 方法名称 | 方法作用 |
|---|---|
| isDone() | 判断当前操作是否完成 |
| isSuccess() | 判断已完成的当前操作是否成功 |
| getCause() | 获取已完成当前操作失败的原因 |
| isCancelled() | 判断已完成的当前操作是否被取消 |
| addListener() | 注册监听器,当前操作(Future)已完成,将会通知指定的监听器 |
3.6.2 Future-Listener demo
绑定端口操作时异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");} else{System.err.println("端口["+ port + "]绑定失败!");}});
3.7 http demo
public static void main(String[] args) throws InterruptedException {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());ChannelFuture channelFuture = bootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
定义ChannelInitializer
用于给Channel对应的pipeline添加handler。该ChannelInitializer中的代码在SocketChannel被创建时都会执行
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//向管道加入处理器//得到管道ChannelPipeline pipeline = socketChannel.pipeline();//加入一个 netty 提供的 httpServerCodec:codec => [coder - decoder]//1、HttpServerCodec 是 netty 提供的处理http的编解码器pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());//2、增加自定义的Handlerpipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());}}
自定义Handler
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {/*** 读取客户端数据。** @param channelHandlerContext* @param httpObject 客户端和服务器端互相通讯的数据被封装成 HttpObject* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {//判断 msg 是不是 HTTPRequest 请求if (httpObject instanceof HttpRequest) {System.out.println("msg 类型 = " + httpObject.getClass());System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());//获取HttpRequest request = (HttpRequest) httpObject;//获取uri,进行路径过滤URI uri = new URI(request.uri());if ("/favicon.ico".equals(uri.getPath())) {System.out.println("请求了 favicon.ico,不做响应");}//回复信息给浏览器[http协议]ByteBuf content = Unpooled.copiedBuffer("HelloWorld", CharsetUtil.UTF_8);//构造一个http的响应,即HTTPResponseDefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());//将构建好的 response 返回channelHandlerContext.writeAndFlush(response);}}}

