1. NIO 模型代码
NIO 的 selector解决了很多连接不需要遍历每一个Channel的问题。
selector.select() 阻塞等待需要处理的事件发生。
package cn.java.money.nio.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server {
public static void main(String[] args) throws IOException {
//首先的有一个通道,接受连接
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.bind(new InetSocketAddress(8888));
Selector selector = Selector.open();
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//select会在这里阻塞
while (selector.select() > 0) {
//通过Selector,只遍历有事件的channel去处理
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel socketChannel = ssChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
//channel.read(byteBuffer)是把数据读入byteBuffer,此时byteBuffer的模式是写入模式
// 如果某个通道的数据量很多,这里就会长时间处理,就会影响其他channel
while ((length = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
//相当于读取数据,从buffer中读取数据
System.out.println(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//事件处理完要移除
iterator.remove();
}
}
}
}
package cn.java.money.nio.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 8888));
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true){
String s = scanner.nextLine();
//buffer写模式
byteBuffer.put(s.getBytes());
//切换为读模式
byteBuffer.flip();
//写到channel,就是从buffer中读取
socketChannel.write(byteBuffer);
byteBuffer.clear();
}
}
}
2. NIO 源码 Linux版本代码
https://gitee.com/framework-src/openjdk-1.8-b132.git
2.1 Selector.open()
Selector selector = Selector.open();
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public static SelectorProvider provider() {
return sun.nio.ch.DefaultSelectorProvider.create();
}
不同操作系统的JDK提供了不同的 DefaultSelectorProvider,
不同的DefaultSelectorProvider返回不同的SelectorProvider的实现
--- Windown 版本JDK
WindowsSelectorProvider
WindowsSelectorImpl
--- Linux 版本JDK
EPollSelectorProvider
EPollSelectorImpl
不同操作系统的JDK提供了不同的 DefaultSelectorProvider
solaris的 DefaultSelectorProvider
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
sun.nio.ch.EPollSelectorProvider
public class EPollSelectorProvider extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
//native方法 是 epoll最核心的几个方法由linux操作系统实现
private native int epollCreate(); //返回epoll文件描述符
//epfd是epoll的文件描述符,对应Selector
//fd是ServerSocketChannel的文件描述符
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
throws IOException;
epoll_create(256) 返回文件描述符
epoll_create 打开一个epoll文件的描述符。c 语言创建的epoll的实例,就是结构体,用于存储数据。
epfd 是返回的文件描述符。
2.2 ssChannel.register(selector, SelectionKey.OP_ACCEPT);
sun.nio.ch.WindowsSelectorImpl#implRegister
sun.nio.ch.EPollSelectorImpl#implRegister
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
// fd 就是ServerSocketChannel的文件描述符
pollWrapper.add(fd);
keys.add(ski);
}
2.3 selector.select()
sun.nio.ch.WindowsSelectorImpl#doSelect
sun.nio.ch.EPollSelectorImpl#doSelect
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 轮询
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
sun.nio.ch.EPollArrayWrapper#poll
int poll(long timeout) throws IOException {
//epollCtl
updateRegistrations();
//epollWai 是一个nativea方法
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private void updateRegistrations() {
epollCtl(epfd, opcode, fd, events);
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
throws IOException;