关键词:Buffer、Channel、SelectionKey、Selector、事件
Buffer
缓存数组,就是一个内存块,底层用数组实现
与Channel进行数据的读写。
数据的读取写入是通过Buffer, 这个和BIO 一样, 而BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer 是可以读也可以写, 需要 flip (读写切换)方法切换。
Channel
通信通道,每个客户端连接都会建立一个Channel通道
我的理解是:客户端直接与Channel进行通信,当客户端发送消息时,消息就流通到Channel里面,本地程序需要将Channel里面的数据存放在Buffer里面,才可以查看;当本地需要发送消息时,先把消息存在Buffer里面,再将Buffer里面的数据放入Channel,数据就流通到了客户端
总而言之:Buffer就是本地程序与Channel数据交换的一个中间媒介。
SelectionKey、Selector
NIO之所以是非阻塞的,关键在于它一个线程可以同时处理多个客户端的通信。而Selector就是它一个线程如何处理多个客户端通信的关键,一个Selector就对应一个线程
首先在创建与客户端连接的Channel时,应该调用 Channel.register()方法,将Channel注册到一个Selector上面。调用该方法后,会返回一个SelectionKey对象,该对象与Channel是一一对应的。而Selector则通过管理SelectionKey的集合间接的去管理各个Channel。示例图如下:
Selector具体是如何管理这么多个通信的呢?这就引出了事件。
事件、以及NIO的工作流程介绍
- 事件:当将Channel绑定到Selector上面时,必须同时为该Channel声明一个监听该Channel的事件(由Channel和该Channel的事件一起组成了SelectionKey),并将SelectionKey加入到Selector的Set集合中去
- 当有客户端建立连接或者进行通信,会在对应的各个Channel中产生不同的事件。
Selector会一直监听所有的事件,当他监听到某个SelectionKey中有事件产生时,会将所有产生事件的SelectionKey统一加入到一个集合中去
而我们则需要获取到这个集合,首先对集合中的各个SelectionKey进行判断,判断它产生的是什么事件,再根据不同的事件进行不同的处理。
在操作这个SelectionKey集合的时候,其实我们就是在一个线程里面对几个不同客户端的连接进行操作。具体的关系图如下:
缓冲区(Buffer)
基本介绍
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer。
Buffer类介绍
基类是Buffer抽象类
基类派生出基于基本数据类型的7个xxxBuffer 抽象类,没有boolean相关的buffer类。
除了ByteBuffer外,每个基本数据的抽象类 xxxBuffer 类下面都派生出转向 ByteBuffer 的类 ByteBufferXxxAsBufferL 和 ByteBufferAsXxxBufferB实现类;以及 DirectXxxBufferU 和 DirectXxxBufferS 和 HeapXxxBuffer==(具体实例对象类)==这五个类。
就只有抽象类CharBuffer 派生出了第六个类StringCharBuffer。
ByteBuffer只派生出了 HeapByteBuffer 和 MappedByteBufferR 两个类
类图如下:
Buffer类主要属性
属性 描述
- Capacity 容量:即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
- Limit:表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
- Position 位置:下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备
- Mark 标记 :一般不会主动修改,在flip()被调用后,mark就作废了。
mark <= position <= limit <= capacity
Buffer类使用示例
//创建一个Buffer,大小为5,即可以存放5个int
IntBuffer intBuffer = IntBuffer.allocate(5);
//向buffer中存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
//如何从buffer中读取数据
//将buffer转换,读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
- Buffer 刚创建时,capacity = 5 ,固定不变。limit指针指向5,position指向0,mark指向-1,
- 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同。
- 调用 buffer.flip()实际上是重置了position和limit两个变量,将limit放在position的位置,position放在0的位置。这里只是最后的position和limit位置相同,所以flip后limit位置没变。
- 调用 intBuffer.get()实际上是不断移动position指针,直到它移动到limit的位置
Buffer类主要方法
Buffer基类(抽象类)
public final int capacity();
直接返回了此缓冲区的容量,capacity
public final int position();
直接返回了此缓冲区指针的当前位置
public final Buffer position(int newPosition);
设置此缓冲区的位置,设置position
public final int limit();
返回此缓冲区的限制
public final Buffer limit(int newLimit);
设置此缓冲区的限制,设置limit
public final Buffer clear();
清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。
public final Buffer flip();
反转此缓冲区, limit = position;position = 0;mark = -1。
当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。
flip()方法将limit置向position, position置0,那么从position读取数据到limit即为此缓冲区中所有的数据。
public final boolean hasRemaining();
告知当前位置和限制之间是否有元素。return position < limit;
//此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
public abstract boolean isReadOnly();
//告知此缓冲区是否具有可访问的底层实现数组
public abstract boolean hasArray();
public abstract Object array();
返回此缓冲区的底层实现数组
Buffer具体实现类(ByteBuffer为例)
从前面可以看出来对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之对应,最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下:
public static ByteBuffer allocateDirect(int capacity);
创建直接缓冲区
public static ByteBuffer allocate(int capacity) ;
设置缓冲区的初识容量
从当前位置position上get数据,获取之后,position会自动加1
public abstract byte get();
通过绝对位置获取数据。
public abstract byte get(int index);
从当前位置上添加,put之后,position会自动加1
public abstract ByteBuffer put(byte b);
从绝对位置上添加数据
public abstract ByteBuffer put(int index, byte b);
从position当前位置插入元素。Xxx表示基本数据类型
public abstract ByteBuffer putXxx(Xxx value [, int index]);
put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
Buffer具体实现类ByteBuffer
从前面可以看出来对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之对应,
最常用的自然是ByteBuffer类(二进制数据),该类的主要方法如下:
public static ByteBuffer allocateDirect(int capacity);
创建直接缓冲区
public static ByteBuffer allocate(int capacity) ;
设置缓冲区的初识容量
public abstract byte get();
从当前位置position上get数据,获取之后,position会自动加1
public abstract byte get(int index);
通过绝对位置获取数据。
public abstract ByteBuffer put(byte b);
从当前位置上添加,put之后,position会自动加1
public abstract ByteBuffer put(int index, byte b);
从绝对位置上添加数据
public abstract ByteBuffer putXxx(Xxx value [, int index]);
从position当前位置插入元素。Xxx表示基本数据类型
此方法时类型化的 put 和 get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出,
否则可能有 BufferUnderflowException 异常。
示例如下:
ByteBuffer buf = ByteBuffer.allocate(64);
//类型化方式放入数据
buf.putInt(100);
buf.putLong(20);
buf.putChar('上');
buf.putShort((short)44);
//取出,当取出的顺序和上面插入的数据类型的顺序不对时,就会抛出BufferUnderflowException异常
buf.flip();
System.out.println(buf.getInt());
System.out.println(buf.getLong());
System.out.println(buf.getChar());
System.out.println(buf.getShort());
可以将一个普通的Buffer转成只读的Buffer
//创建一个Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
byteBuffer.put((byte)i);
}
//读取
byteBuffer.flip();
//得到一个只读的Buffer
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
//读取
while (readOnlyBuffer.hasRemaining()){
System.out.println(readOnlyBuffer.get());
}
readOnlyBuffer.put((byte)100); //会抛出 ReadOnlyBufferException
Buffer具体实现类MappedByteBuffer
MappedByteBuffer可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由NIO来完成
/**
* MappedByteBuffer可以让文件直接在内存中(堆外内存)修改,操作系统不需要拷贝一次
*/
@Test
public void test() throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
//获取对应的文件通道
FileChannel channel = randomAccessFile.getChannel();
/**
* 参数1: FileChannel.MapMode.READ_WRITE,使用的读写模式
* 参数2: 0,可以直接修改的起始位置
* 参数3: 5,是映射到内存的大小(不是文件中字母的索引位置),
* --> 即将 1.txt 的多少个字节映射到内存,也就是可以直接修改的范围就是 [0, 5)
* 实际的实例化类型:DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0,(byte)'N');
mappedByteBuffer.put(3, (byte)'M');
mappedByteBuffer.put(5, (byte)'Y'); //会抛出 IndexOutOfBoundsException
randomAccessFile.close();
System.out.println("修改成功~");
}
通道(Channel)
基本介绍
NIO的通道类似于流,但有些区别
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓存读数据,也可以写数据到缓存
BIO 中的 stream 是单向的,例如:FileInputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作。
Channel 在 NIO 中是一个接口:public interface Channel extends Closeable{}
常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel(类似ServerSocket)、SocketChannel(类似Socket)
FileChannel 用于文件数据的读写,DatagramChannel用于UDP数据的读写,ServerSocketChannel和SocketChannel用于TCP数据读写
类关系图:
FileChannel类
常见方法
从通道读取数据并放到缓冲区中
此操作也会移动Buffer中的position指针,不断往position中放数据,read完成后position指向limit。
public int read(ByteBuffer dst)
把缓冲区的数据写到通道中
此操作也会不断移动Buffer中的position位置直到limit,读取到的数据就是position到limit这两个指针之间的数据。
public int write(ByteBuffer src)
从目标通道中复制数据到当前通道
public long transferFrom(ReadableByteChannel src, long position, long count)
把数据从当前通道复制给目标通道
public long transferTo(long position, long count, WritableByteChannel target)
该方法拷贝数据使用了零拷贝,通常用来在网络IO传输中,将FileChannel里面的文件数据直接拷贝到与客户端或者服务端连接的Channel里面从而达到文件传输。
应用实例
实例1:将数据写入到本地文件
String str = "hello,尚硅谷";
//创建一个输出流 -> Channel
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file01.txt");
//通过 FileOutputStream 获取对应的 FileChannel
//这个 FileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer //将str放入ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(str.getBytes());
//对ByteBuffer进行反转,开始读取
byteBuffer.flip();
//将ByteBuffer数据写入到FileChannel, 此操作会不断移动Buffer中的position到limit的位置
fileChannel.write(byteBuffer);
fileOutputStream.close();
实例1、2的示例图:
实例2:从本地文件读取数据
//创建文件的输入流
File file = new File("d:\\file01.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区 //将通道的数据读入到buffer
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
fileChannel.read(byteBuffer);
//将ByteBuffer 的字节数据转成String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
实例3:使用一个Buffer完成文件的读取
FileInputStream fileInputStream = new FileInputStream("1.txt");
FileChannel fileChannel1 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("2.txt");
FileChannel fileChannel2 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true){
//清空buffer,由于循环的最后执行了 write 操作,会将 position 移动到 limit 的位置
//清空 Buffer的操作才为上一次的循环重置position的位置
// 如果没有重置position,那么上次读取后,position和limit位置一样,读取后read的值永远为0
byteBuffer.clear();
//将数据存入 ByteBuffer,它会基于 Buffer 此刻的 position 和 limit 的值,
// 将数据放入position的位置,然后不断移动position直到其与limit相等;
int read = fileChannel1.read(byteBuffer);
System.out.println("read=" + read);
if (read == -1) { //表示读完
break;
}
//将buffer中的数据写入到 FileChannel02 ---- 2.txt
byteBuffer.flip();
fileChannel2.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
实例4:拷贝文件 transferFrom 方法
//创建相关流
FileInputStream fileInputStream = new FileInputStream("d:\\a.gif");
FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.gif");
//获取各个流对应的FileChannel
FileChannel source = fileInputStream.getChannel();
FileChannel dest = fileOutputStream.getChannel();
//使用 transferForm 完成拷贝
dest.transferFrom(source, 0, source.size());
//关闭相关的通道和流
source.close();
dest.close();
fileInputStream.close();
fileOutputStream.close();
ServerSocketChannel 和 SocketChannel 类
常见方法
ServerSocketChannel:主要用于在服务器监听新的客户端Socket连接
得到一个 ServerSocketChannel 通道
public static ServerSocketChannel open()
设置服务器监听端口
public final ServerSocketChannel bind(SocketAddress local)
用于设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
public final SelectableChannel configureBlocking(boolean block)
接受一个连接,返回代表这个连接的通道对象
public abstract SocketChannel accept()
将Channel注册到选择器并设置监听事件,也可以在绑定的同时注册多个事件,如下所示:
channel.register(selector,Selectionkey.OP_READ | Selectionkey.OP_CONNECT)
SocketChannel:网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
public final SelectionKey register(Selector sel, int ops)
得到一个SocketChannel通道
public static SocketChannel open()
public final SelectableChannel configureBlocking(boolean block)
设置阻塞或非阻塞模式,取值 false表示采用非阻塞模式
此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
public abstract boolean connect(SocketAddress remote)
连接服务器
public boolean finishConnect()
如果上面的方法连接失败,接下来就要通过该方法完成连接操作
往通道里写数据
这里写入的是buffer里面position到limit这个之间的数据
public int write(ByteBuffer src)
从通道里读数据
public int read(ByteBuffer dst)
注册Channel到选择器并设置监听事件,最后一个参数可以设置共享数据
public final SelectionKey register(Selector sel, int ops, Object att)
关闭通道
public final void close()
应用实例
通过Buffer数组来完成读写操作,即Scattering和Gathering
/**
* Scattering:将数据写入到buffer时,可以采用buffer数组,初次写入 【分散】
* Gathering:从buffer读取数据时,也可以采用buffer数组,依次读
*/
@Test
public void test() throws IOException {
//使用 ServerSocketChannel 和 SocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
//绑定端口到socket,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
//创建一个Buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等待客户端的连接(Telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
int msgLength = 8; //假定从客户端接受8个字节
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < msgLength) {
long l = socketChannel.read(byteBuffers);
byteRead += l; //累计读取的字节数
System.out.println("byteRead= " + byteRead);
//使用流打印,看看当前这个buffer的position和limit
Arrays.stream(byteBuffers)
.map(buffer -> "position=" + buffer.position() + ", limit = " + buffer.limit())
.forEach(System.out::println);
}
//读取数据后需要将所有的buffer进行flip
Arrays.asList(byteBuffers).forEach(Buffer::flip);
//将数据读出显示到客户端
long byteWrite = 0;
while (byteWrite < msgLength) {
long l = socketChannel.write(byteBuffers);
byteWrite += l;
}
//将所有的 buffer 进行clear操作
Arrays.asList(byteBuffers).forEach(Buffer::clear);
System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite
+ ", msgLength=" + msgLength);
}
}
Selector(选择器)
基本介绍
Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,避免了多线程之间的上下文切换导致的开销
Netty的IO线程NioEventLoop聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
Selector常见方法
得到一个选择器对象,实例化出 WindowsSelectorImpl对象。
public static Selector open();
监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,
--> 返回的结果为Channel响应的事件总和,当结果为0时,表示本Selector监听的所有Channel中没有Channel产生事件。
如果不传入timeout值,就会阻塞线程,传入值则为阻塞多少毫秒,通过它设置超时时间。
之所以需要传入时间,是为了让它等待几秒钟再看有没有Channel会产生事件,从而获取一段时间内产生事件的Channel的总集合再一起处理。
public int select(long timeout)
selector.selectNow();
不会阻塞,立马返回冒泡的事件数
从内部集合中得到所有的SelectionKey
public Set<SelectionKey> selectedKeys()
SelectionKey介绍
主要作用:
Selector通过管理SelectionKey的集合从而去监听各个Channel。当Channel注册到Selector上面时,会携带该Channel关注的事件(SelectionKey包含Channel以及与之对应的事件),并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。SelectionKey表示的是Selector和网络通道的注册关系,固FileChannel是没有办法通过SelectionKey注册到Selector上去的。
四大事件
值为1,表示读操作,
代表本Channel已经接受到其他客户端传过来的消息,需要将Channel中的数据读取到Buffer中去
public static final int OP_READ = 1 << 0
值为4,表示写操作
一般临时将Channel的事件修改为它,在处理完后又修改回去。我暂时也没明白具体的作用。
public static final int OP_WRITE = 1 << 2
值为8,代表建立连接。
一般在ServerSocketChannel上绑定该事件,结合 channel.finishConnect()在连接建立异常时进行异常处理
public static final int OP_CONNECT = 1 << 3
值为16,表示由新的网络连接可以accept。
与ServerSocketChannel进行绑定,用于创建新的SocketChannel,并把其注册到Selector上去
public static final int OP_ACCEPT = 1 << 4
相关方法
public abstract Selector selector()
得到该SelectionKey具体是属于哪个Selector对象的
public abstract SelectableChannel channel()
通过SelectionKey的到对应的Channel
public final Object attachment()
得到与之关联的共享数据,一般用于获取buffer
在使用register注册通道时,也可以为该Channel绑定一个Buffer,可以通过本方法获取这个Buffer。
通过selectionKey.attach(Object ob)绑定的数据,也是通过该方法获取
public abstract SelectionKey interestOps()
获取该SelectionKey下面的事件
public abstract SelectionKey interestOps(int ops)
用于设置或改变某个Channel关联的事件
增加事件:key.interestOps(key.interestOps | SelectionKey.OP_WRITE)
减少事件:key.interestOps(key.interestOps & ~SelectionKey.OP_WRITE)
public final boolean isAcceptable(),isReadable(),isWritable(),isConnectable()
用于判断这个SelectionKey产生的是什么事件,与上面的事件类型一一对应