1. NIO 模型代码

image.png
NIO 的 selector解决了很多连接不需要遍历每一个Channel的问题。
selector.select() 阻塞等待需要处理的事件发生。

  1. package cn.java.money.nio.demo2;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. public class Server {
  11. public static void main(String[] args) throws IOException {
  12. //首先的有一个通道,接受连接
  13. ServerSocketChannel ssChannel = ServerSocketChannel.open();
  14. ssChannel.configureBlocking(false);
  15. ssChannel.bind(new InetSocketAddress(8888));
  16. Selector selector = Selector.open();
  17. ssChannel.register(selector, SelectionKey.OP_ACCEPT);
  18. //select会在这里阻塞
  19. while (selector.select() > 0) {
  20. //通过Selector,只遍历有事件的channel去处理
  21. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  22. while (iterator.hasNext()) {
  23. SelectionKey selectionKey = iterator.next();
  24. if (selectionKey.isAcceptable()) {
  25. SocketChannel socketChannel = ssChannel.accept();
  26. socketChannel.configureBlocking(false);
  27. socketChannel.register(selector, SelectionKey.OP_READ);
  28. } else if (selectionKey.isReadable()) {
  29. SocketChannel channel = (SocketChannel) selectionKey.channel();
  30. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  31. int length = 0;
  32. //channel.read(byteBuffer)是把数据读入byteBuffer,此时byteBuffer的模式是写入模式
  33. // 如果某个通道的数据量很多,这里就会长时间处理,就会影响其他channel
  34. while ((length = channel.read(byteBuffer)) > 0) {
  35. byteBuffer.flip();
  36. //相当于读取数据,从buffer中读取数据
  37. System.out.println(new String(byteBuffer.array(), 0, length));
  38. byteBuffer.clear();
  39. }
  40. }
  41. //事件处理完要移除
  42. iterator.remove();
  43. }
  44. }
  45. }
  46. }
  1. package cn.java.money.nio.demo2;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. import java.util.Scanner;
  7. public class Client {
  8. public static void main(String[] args) throws IOException {
  9. SocketChannel socketChannel = SocketChannel.open(
  10. new InetSocketAddress("127.0.0.1", 8888));
  11. socketChannel.configureBlocking(false);
  12. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  13. Scanner scanner = new Scanner(System.in);
  14. while (true){
  15. String s = scanner.nextLine();
  16. //buffer写模式
  17. byteBuffer.put(s.getBytes());
  18. //切换为读模式
  19. byteBuffer.flip();
  20. //写到channel,就是从buffer中读取
  21. socketChannel.write(byteBuffer);
  22. byteBuffer.clear();
  23. }
  24. }
  25. }

2. NIO 源码 Linux版本代码

https://gitee.com/framework-src/openjdk-1.8-b132.git

2.1 Selector.open()
  1. Selector selector = Selector.open();
  2. public static Selector open() throws IOException {
  3. return SelectorProvider.provider().openSelector();
  4. }
  5. public static SelectorProvider provider() {
  6. return sun.nio.ch.DefaultSelectorProvider.create();
  7. }
  8. 不同操作系统的JDK提供了不同的 DefaultSelectorProvider
  9. 不同的DefaultSelectorProvider返回不同的SelectorProvider的实现
  10. --- Windown 版本JDK
  11. WindowsSelectorProvider
  12. WindowsSelectorImpl
  13. --- Linux 版本JDK
  14. EPollSelectorProvider
  15. EPollSelectorImpl

不同操作系统的JDK提供了不同的 DefaultSelectorProvider
image.png
solaris的 DefaultSelectorProvider

  1. public static SelectorProvider create() {
  2. String osname = AccessController
  3. .doPrivileged(new GetPropertyAction("os.name"));
  4. if (osname.equals("SunOS"))
  5. return createProvider("sun.nio.ch.DevPollSelectorProvider");
  6. if (osname.equals("Linux"))
  7. return createProvider("sun.nio.ch.EPollSelectorProvider");
  8. return new sun.nio.ch.PollSelectorProvider();
  9. }

sun.nio.ch.EPollSelectorProvider

  1. public class EPollSelectorProvider extends SelectorProviderImpl
  2. {
  3. public AbstractSelector openSelector() throws IOException {
  4. return new EPollSelectorImpl(this);
  5. }
  6. public Channel inheritedChannel() throws IOException {
  7. return InheritedChannel.getChannel();
  8. }
  9. }
  1. EPollSelectorImpl(SelectorProvider sp) throws IOException {
  2. super(sp);
  3. long pipeFds = IOUtil.makePipe(false);
  4. fd0 = (int) (pipeFds >>> 32);
  5. fd1 = (int) pipeFds;
  6. pollWrapper = new EPollArrayWrapper();
  7. pollWrapper.initInterrupt(fd0, fd1);
  8. fdToKey = new HashMap<>();
  9. }
  1. EPollArrayWrapper() throws IOException {
  2. // creates the epoll file descriptor
  3. epfd = epollCreate();
  4. // the epoll_event array passed to epoll_wait
  5. int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
  6. pollArray = new AllocatedNativeObject(allocationSize, true);
  7. pollArrayAddress = pollArray.address();
  8. // eventHigh needed when using file descriptors > 64k
  9. if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
  10. eventsHigh = new HashMap<>();
  11. }
  12. //native方法 是 epoll最核心的几个方法由linux操作系统实现
  13. private native int epollCreate(); //返回epoll文件描述符
  14. //epfd是epoll的文件描述符,对应Selector
  15. //fd是ServerSocketChannel的文件描述符
  16. private native void epollCtl(int epfd, int opcode, int fd, int events);
  17. private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
  18. throws IOException;

epoll_create(256) 返回文件描述符
image.png
epoll_create 打开一个epoll文件的描述符。c 语言创建的epoll的实例,就是结构体,用于存储数据。
epfd 是返回的文件描述符。
image.png

2.2 ssChannel.register(selector, SelectionKey.OP_ACCEPT);

sun.nio.ch.WindowsSelectorImpl#implRegister
sun.nio.ch.EPollSelectorImpl#implRegister

  1. protected void implRegister(SelectionKeyImpl ski) {
  2. if (closed)
  3. throw new ClosedSelectorException();
  4. SelChImpl ch = ski.channel;
  5. int fd = Integer.valueOf(ch.getFDVal());
  6. fdToKey.put(fd, ski);
  7. // fd 就是ServerSocketChannel的文件描述符
  8. pollWrapper.add(fd);
  9. keys.add(ski);
  10. }

2.3 selector.select()

sun.nio.ch.WindowsSelectorImpl#doSelect
sun.nio.ch.EPollSelectorImpl#doSelect

  1. protected int doSelect(long timeout) throws IOException {
  2. if (closed)
  3. throw new ClosedSelectorException();
  4. processDeregisterQueue();
  5. try {
  6. begin();
  7. // 轮询
  8. pollWrapper.poll(timeout);
  9. } finally {
  10. end();
  11. }
  12. processDeregisterQueue();
  13. int numKeysUpdated = updateSelectedKeys();
  14. if (pollWrapper.interrupted()) {
  15. // Clear the wakeup pipe
  16. pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
  17. synchronized (interruptLock) {
  18. pollWrapper.clearInterrupted();
  19. IOUtil.drain(fd0);
  20. interruptTriggered = false;
  21. }
  22. }
  23. return numKeysUpdated;
  24. }

sun.nio.ch.EPollArrayWrapper#poll

  1. int poll(long timeout) throws IOException {
  2. //epollCtl
  3. updateRegistrations();
  4. //epollWai 是一个nativea方法
  5. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
  6. for (int i=0; i<updated; i++) {
  7. if (getDescriptor(i) == incomingInterruptFD) {
  8. interruptedIndex = i;
  9. interrupted = true;
  10. break;
  11. }
  12. }
  13. return updated;
  14. }
  15. private void updateRegistrations() {
  16. epollCtl(epfd, opcode, fd, events);
  17. }
  18. private native void epollCtl(int epfd, int opcode, int fd, int events);
  19. private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
  20. throws IOException;

3. select poll epoll的区别

image.png