例子

  1. public class TestIO {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  4. //int kqfd = kqueue()
  5. final Selector selector = Selector.open();
  6. //f_cntl
  7. serverSocketChannel.socket().setReuseAddress(true);
  8. //bind(fd)
  9. serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));
  10. //配置非阻塞,如果设置成true还是阻塞的配置
  11. //fcntl(fd,flags|n_noblack);
  12. serverSocketChannel.configureBlocking(false);
  13. //注意这里的第三个参数,会attach到SelectorKey上
  14. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
  15. System.out.println("-- server ready");
  16. while (true) {
  17. //jdk 11才有的方法
  18. selector.select(key -> {
  19. if (!key.isValid()) {
  20. return;
  21. }
  22. //获取到attach的key
  23. final ServerSocketChannel ch = (ServerSocketChannel) key.attachment();
  24. try {
  25. if (key.isAcceptable()) {
  26. //获取endpoint,对应了serverSocket.accpet()返回的socket
  27. SocketChannel client_ch = ch.accept();
  28. if (client_ch != null) { // accept() may return null...
  29. System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
  30. //客户端也需要配置成非阻塞
  31. client_ch.configureBlocking(false);
  32. //监听可读时间
  33. client_ch.register(selector, SelectionKey.OP_READ, key.attachment());
  34. }
  35. } else if (key.isReadable()) {
  36. //整体的流程就是从socket中读取数据
  37. //然后写入到队列中,然后注册监听可写事件
  38. //当触发可写事件时,从队列中获取数据并写回
  39. SocketChannel socketChannel = (SocketChannel) key.channel();
  40. socketChannel.register(selector, SelectionKey.OP_WRITE);
  41. ByteBuffer buffer = ByteBuffer.allocate(1024);
  42. int read = socketChannel.read(buffer);
  43. buffer.flip();
  44. if (read == -1) {
  45. throw new IOException("Socket closed");
  46. }
  47. String result = new String(buffer.array()).trim();
  48. System.out.println("receive:" + result);
  49. ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
  50. Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
  51. if (pendingWrites == null) {
  52. synchronized (channelToPendingWrites) {
  53. pendingWrites = channelToPendingWrites.get(key.channel());
  54. if (pendingWrites == null) {
  55. pendingWrites = new ConcurrentLinkedQueue<>();
  56. channelToPendingWrites.put(key.channel(), pendingWrites);
  57. }
  58. }
  59. }
  60. pendingWrites.add(writeBuffer);
  61. socketChannel.register(selector, SelectionKey.OP_WRITE);
  62. } else if (key.isWritable()) {
  63. //参考可读事件的注释
  64. SocketChannel socketChannel = (SocketChannel) key.channel();
  65. final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
  66. if (objects == null || objects.size() == 0) {
  67. final ByteBuffer allocate = ByteBuffer.allocate(12);
  68. allocate.put("hello,world!".getBytes());
  69. socketChannel.write(allocate);
  70. } else {
  71. final Object poll = objects.poll();
  72. socketChannel.write((ByteBuffer) poll);
  73. }
  74. socketChannel.register(selector, SelectionKey.OP_READ);
  75. }
  76. } catch (Exception e) {
  77. e.printStackTrace();
  78. }
  79. }, 10000L);
  80. }
  81. }
  82. private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
  83. private static final byte[] ACK = "Data logged successfully\n".getBytes();
  84. }

讨论什么?

  • 理解IO多路复用,Java中的概念分别代表了什么.
  • 零拷贝? 不再需要拷贝了么?
  • Java NIO中的register,Acceptable,Readable, writeable代表什么意思。

    • serverSocketChannel.register,注册到底是注册什么?
    • socketChannel.register(selector, SelectionKey.OP_WRITE), 不注册OP_WRITE也能响应数据到客户端么
      • 如果不需要register,那么添加这个register是为了干啥,直接在writeAndFlush响应回去不就可以了么?
      • 如果需要register,那么数据什么时候写回去? 怎么写回去,

        IO模型

  • IO模型I — 阻塞

    • 系统调用

image.png

  1. #include <stdio.h>
  2. int main() {
  3. printf("Hello, World!");
  4. return 0;
  5. }

输出结果

  1. [root@iZbp11om21c05wzu8e4tx0Z test]# vim hello.c
  2. [root@iZbp11om21c05wzu8e4tx0Z test]# gcc hello.c -o hello
  3. [root@iZbp11om21c05wzu8e4tx0Z test]# strace ./hello
  4. execve("./hello", ["./hello"], [/* 22 vars */]) = 0
  5. brk(NULL) = 0x6fe000
  6. mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d87000
  7. access("/etc/ld.so.preload", R_OK) = -1 ENOENT (No such file or directory)
  8. open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
  9. fstat(3, {st_mode=S_IFREG|0644, st_size=30947, ...}) = 0
  10. mmap(NULL, 30947, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7f1fa6d7f000
  11. close(3) = 0
  12. open("/lib64/libc.so.6", O_RDONLY|O_CLOEXEC) = 3
  13. read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\20&\2\0\0\0\0\0"..., 832) = 832
  14. fstat(3, {st_mode=S_IFREG|0755, st_size=2156160, ...}) = 0
  15. mmap(NULL, 3985888, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f1fa6799000
  16. mprotect(0x7f1fa695c000, 2097152, PROT_NONE) = 0
  17. mmap(0x7f1fa6b5c000, 24576, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x1c3000) = 0x7f1fa6b5c000
  18. mmap(0x7f1fa6b62000, 16864, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6b62000
  19. close(3) = 0
  20. mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7e000
  21. mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7c000
  22. arch_prctl(ARCH_SET_FS, 0x7f1fa6d7c740) = 0
  23. mprotect(0x7f1fa6b5c000, 16384, PROT_READ) = 0
  24. mprotect(0x600000, 4096, PROT_READ) = 0
  25. mprotect(0x7f1fa6d88000, 4096, PROT_READ) = 0
  26. munmap(0x7f1fa6d7f000, 30947) = 0
  27. fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 0), ...}) = 0
  28. mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d86000
  29. write(1, "Hello, World!", 13Hello, World!) = 13
  30. exit_group(0) = ?
  31. +++ exited with 0 +++
  • 零拷贝
  • Reactor 模型 (原文章已经不能下载了) 《Scalable IO in Java》
  • Redis中的使用(强烈建读)
  • Netty中的使用

    • 实操

      单线程

      1. int main() {
      2. int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议
      3. //将套接字和IP、端口绑定
      4. struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));
      5. serv_addr->sin_family = AF_INET; //使用IPv4地址
      6. serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
      7. serv_addr->sin_port = htons(11234); //端口
      8. int on = 1;
      9. if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) {
      10. printf("处理失败SO_REUSEPORT失败");
      11. exit(1);
      12. }
      13. bind(serv_sock, (struct sockaddr *) serv_addr, sizeof(struct sockaddr));
      14. listen(serv_sock, 20);
      15. for (;;) {
      16. int fd = accept(serv_sock, NULL, NULL);
      17. char *hello = "hello chenshun!";
      18. char cc[20];
      19. ssize_t rr = read(fd, cc, 10);
      20. if (rr == -1) {
      21. close(fd);
      22. }
      23. printf("i receive your message:%s\n", cc);
      24. write(fd, hello, strlen(hello));
      25. write(fd, "good bye!", strlen("good bye!"));
      26. close(fd);
      27. }
      28. return 0;
      29. }

      多线程

      ```c void run(void ff) { int fd = (int ) (ff); char *hello = “hello chenshun!”; char cc[11]; ssize_t rr = read(fd, cc, 10); if (rr == -1) { close(fd); } printf(“i receive your message:%s\n”, cc); write(fd, hello, strlen(hello)); write(fd, “good bye!”, strlen(“good bye!”)); close(fd); return 0; }

int main() { int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议 //将套接字和IP、端口绑定 struct sockaddr_in serv_addr = malloc(sizeof(struct sockaddr_in)); serv_addr->sin_family = AF_INET; //使用IPv4地址 serv_addr->sin_addr.s_addr = inet_addr(“127.0.0.1”); //具体的IP地址 serv_addr->sin_port = htons(11234); //端口 int on = 1; if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) { printf(“处理失败SO_REUSEPORT失败”); exit(1); } bind(serv_sock, (struct sockaddr ) serv_addr, sizeof(struct sockaddr)); listen(serv_sock, 20);

  1. for (;;) {
  2. int fd = accept(serv_sock, NULL, NULL);
  3. pthread_attr_t attr;
  4. pthread_t thread;
  5. size_t stacksize;
  6. pthread_attr_init(&attr);
  7. pthread_attr_getstacksize(&attr, &stacksize);
  8. if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
  9. while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
  10. pthread_attr_setstacksize(&attr, stacksize);
  11. int res = pthread_create(&thread, &attr, run, &fd);
  12. printf("创建线程执行:%d\n", res);
  13. if (res != 0) {
  14. exit(-1);
  15. }
  16. }
  17. return 0;

}

  1. <a name="ga2zT"></a>
  2. #### IO多路复用
  3. ```c
  4. //
  5. // Created by chenshun on 2022/3/21.
  6. //
  7. #include <sys/socket.h>
  8. #include <sys/event.h>
  9. #include <netinet/in.h>
  10. #include <arpa/inet.h>
  11. #include <fcntl.h>
  12. #include <unistd.h>
  13. #include <stdio.h>
  14. #include <string.h>
  15. #include <stdlib.h>
  16. const size_t PAGE = 1024 * 16;
  17. const int kReadEvent = 1;
  18. const int kWriteEvent = 2;
  19. void updateEvent(int kqFd, int fd, int events) {
  20. struct kevent ke;
  21. if (events & kReadEvent) {
  22. EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, 0);
  23. kevent(kqFd, &ke, 1, NULL, 0, NULL);
  24. }
  25. if (events & kWriteEvent) {
  26. EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
  27. kevent(kqFd, &ke, 1, NULL, 0, NULL);
  28. }
  29. }
  30. void delEvent(int kqFd, int fd, int events) {
  31. struct kevent ke;
  32. if (events & kReadEvent) {
  33. EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
  34. kevent(kqFd, &ke, 1, NULL, 0, NULL);
  35. }
  36. if (events & kWriteEvent) {
  37. EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  38. kevent(kqFd, &ke, 1, NULL, 0, NULL);
  39. }
  40. }
  41. void handleAccept(int kq, int socket_fd) {
  42. struct sockaddr_storage sa;
  43. socklen_t salen = sizeof(sa);
  44. int client = accept(socket_fd, (struct sockaddr *) &sa, &salen);
  45. int flags = fcntl(client, F_GETFL, 0);
  46. fcntl(client, F_SETFL, flags | O_NONBLOCK);
  47. updateEvent(kq, client, kReadEvent);
  48. }
  49. void handleRead(int kq, int fd) {
  50. char buf[PAGE];
  51. size_t rr = read(fd, &buf, PAGE);
  52. if (rr == 0) {
  53. delEvent(kq, fd, kReadEvent | kWriteEvent);
  54. close(fd);
  55. return;
  56. }
  57. delEvent(kq, fd, kReadEvent);
  58. if (rr == 6 && !strcmp("exit\r\n", buf)) {
  59. write(fd, "good bye!\n", strlen("good bye!\n"));
  60. close(fd);
  61. return;
  62. }
  63. printf("receive bytes: %zu, i have receive your message: %s\n", rr, buf);
  64. updateEvent(kq, fd, kWriteEvent);
  65. }
  66. void handleWrite(int kq, int fd) {
  67. char *send = "i have receive your message, expect your next message\n";
  68. write(fd, send, strlen(send));
  69. delEvent(kq, fd, kWriteEvent);
  70. updateEvent(kq, fd, kReadEvent);
  71. }
  72. void loop_once(int kq, int socket_fd, int waitms) {
  73. struct timespec timeout;
  74. timeout.tv_sec = waitms / 1000;
  75. timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
  76. struct kevent *event = malloc(sizeof(struct kevent *));
  77. int retval = kevent(kq, NULL, 0, event, 20, &timeout);
  78. if (retval > 0) {
  79. printf("收到事件: %d 件\n", retval);
  80. }
  81. for (int i = 0; i < retval; i++) {
  82. struct kevent ev = event[i];
  83. uintptr_t fd = ev.ident;
  84. if (ev.filter == EVFILT_READ) {
  85. //处理可读时间
  86. if (fd == socket_fd) {
  87. handleAccept(kq, socket_fd);
  88. } else {
  89. handleRead(kq, (int) fd);
  90. }
  91. }
  92. if (ev.filter == EVFILT_WRITE) {
  93. //处理可写事件
  94. handleWrite(kq, (int) fd);
  95. }
  96. }
  97. free(event);
  98. }
  99. int main() {
  100. int port = 19999;
  101. int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  102. struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));
  103. if (serv_addr == NULL) {
  104. printf("分配内存失败");
  105. exit(1);
  106. }
  107. serv_addr->sin_family = AF_INET;
  108. serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1");
  109. serv_addr->sin_port = htons(port);
  110. //
  111. int on = 1;
  112. setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
  113. int flags = fcntl(socket_fd, F_GETFL, 0);
  114. fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK);
  115. if (bind(socket_fd, (struct sockaddr *) serv_addr, sizeof(struct sockaddr)) < 0) {
  116. printf("bind 失败!");
  117. exit(1);
  118. }
  119. listen(socket_fd, 20);
  120. printf("使用telnet 127.0.0.1 19999 来进行测试\n");
  121. printf("输入exit来断开TCP链接\n");
  122. int queue = kqueue();
  123. //注册
  124. updateEvent(queue, socket_fd, kReadEvent);
  125. for (;;) {
  126. loop_once(queue, socket_fd, 10000);
  127. }
  128. return 0;
  129. }

多线程IO多路复用