1. NIO 模型代码

NIO 的 selector解决了很多连接不需要遍历每一个Channel的问题。
selector.select() 阻塞等待需要处理的事件发生。
package cn.java.money.nio.demo2;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class Server {public static void main(String[] args) throws IOException {//首先的有一个通道,接受连接ServerSocketChannel ssChannel = ServerSocketChannel.open();ssChannel.configureBlocking(false);ssChannel.bind(new InetSocketAddress(8888));Selector selector = Selector.open();ssChannel.register(selector, SelectionKey.OP_ACCEPT);//select会在这里阻塞while (selector.select() > 0) {//通过Selector,只遍历有事件的channel去处理Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isAcceptable()) {SocketChannel socketChannel = ssChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);} else if (selectionKey.isReadable()) {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int length = 0;//channel.read(byteBuffer)是把数据读入byteBuffer,此时byteBuffer的模式是写入模式// 如果某个通道的数据量很多,这里就会长时间处理,就会影响其他channelwhile ((length = channel.read(byteBuffer)) > 0) {byteBuffer.flip();//相当于读取数据,从buffer中读取数据System.out.println(new String(byteBuffer.array(), 0, length));byteBuffer.clear();}}//事件处理完要移除iterator.remove();}}}}
package cn.java.money.nio.demo2;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.Scanner;public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));socketChannel.configureBlocking(false);ByteBuffer byteBuffer = ByteBuffer.allocate(1024);Scanner scanner = new Scanner(System.in);while (true){String s = scanner.nextLine();//buffer写模式byteBuffer.put(s.getBytes());//切换为读模式byteBuffer.flip();//写到channel,就是从buffer中读取socketChannel.write(byteBuffer);byteBuffer.clear();}}}
2. NIO 源码 Linux版本代码
https://gitee.com/framework-src/openjdk-1.8-b132.git
2.1 Selector.open()
Selector selector = Selector.open();public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();}public static SelectorProvider provider() {return sun.nio.ch.DefaultSelectorProvider.create();}不同操作系统的JDK提供了不同的 DefaultSelectorProvider,不同的DefaultSelectorProvider返回不同的SelectorProvider的实现--- Windown 版本JDKWindowsSelectorProviderWindowsSelectorImpl--- Linux 版本JDKEPollSelectorProviderEPollSelectorImpl
不同操作系统的JDK提供了不同的 DefaultSelectorProvider

solaris的 DefaultSelectorProvider
public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider();}
sun.nio.ch.EPollSelectorProvider
public class EPollSelectorProvider extends SelectorProviderImpl{public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}public Channel inheritedChannel() throws IOException {return InheritedChannel.getChannel();}}
EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;pollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}
EPollArrayWrapper() throws IOException {// creates the epoll file descriptorepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();}//native方法 是 epoll最核心的几个方法由linux操作系统实现private native int epollCreate(); //返回epoll文件描述符//epfd是epoll的文件描述符,对应Selector//fd是ServerSocketChannel的文件描述符private native void epollCtl(int epfd, int opcode, int fd, int events);private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)throws IOException;
epoll_create(256) 返回文件描述符
epoll_create 打开一个epoll文件的描述符。c 语言创建的epoll的实例,就是结构体,用于存储数据。
epfd 是返回的文件描述符。
2.2 ssChannel.register(selector, SelectionKey.OP_ACCEPT);
sun.nio.ch.WindowsSelectorImpl#implRegister
sun.nio.ch.EPollSelectorImpl#implRegister
protected void implRegister(SelectionKeyImpl ski) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;int fd = Integer.valueOf(ch.getFDVal());fdToKey.put(fd, ski);// fd 就是ServerSocketChannel的文件描述符pollWrapper.add(fd);keys.add(ski);}
2.3 selector.select()
sun.nio.ch.WindowsSelectorImpl#doSelect
sun.nio.ch.EPollSelectorImpl#doSelect
protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();// 轮询pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}
sun.nio.ch.EPollArrayWrapper#poll
int poll(long timeout) throws IOException {//epollCtlupdateRegistrations();//epollWai 是一个nativea方法updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;}private void updateRegistrations() {epollCtl(epfd, opcode, fd, events);}private native void epollCtl(int epfd, int opcode, int fd, int events);private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)throws IOException;
3. select poll epoll的区别

