这一讲我们就将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离,形成所谓的主 - 从 reactor 模式。

主 - 从 reactor 模式

image.png

主 - 从这个模式的核心思想是,主反应堆线程只负责分发 Acceptor 连接建立,已连接套接字上的 I/O 事件交给 sub-reactor 负责分发。其中 sub-reactor 的数量,可以根据 CPU 的核数来灵活设置。

我们的主反应堆线程一直在感知连接建立的事件,如果有连接成功建立,主反应堆线程通过 accept 方法获取已连接套接字,接下来会按照一定的算法选取一个从反应堆线程,并把已连接套接字加入到选择好的从反应堆线程中。

问题:

  • 主反应堆线程和从反应堆线程,是两个不同的线程,如何把已连接套接字加入到另外一个线程中呢?

主 - 从 reactor+worker threads 模式

如果说主 - 从 reactor 模式解决了 I/O 分发的高效率问题,那么 work threads 就解决了业务逻辑和 I/O 分发之间的耦合问题。
**
把这两个策略组装在一起,就是实战中普遍采用的模式。大名鼎鼎的 Netty,就是把这种模式发挥到极致的一种实现。不过要注意 Netty 里面提到的 worker 线程,其实就是我们这里说的从 reactor 线程,并不是处理具体业务逻辑的 worker 线程。

image.png

样例程序

  1. #include <lib/acceptor.h>
  2. #include "lib/common.h"
  3. #include "lib/event_loop.h"
  4. #include "lib/tcp_server.h"
  5. char rot13_char(char c) {
  6. if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
  7. return c + 13;
  8. else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
  9. return c - 13;
  10. else
  11. return c;
  12. }
  13. //连接建立之后的callback
  14. int onConnectionCompleted(struct tcp_connection *tcpConnection) {
  15. printf("connection completed\n");
  16. return 0;
  17. }
  18. //数据读到buffer之后的callback
  19. int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
  20. printf("get message from tcp connection %s\n", tcpConnection->name);
  21. printf("%s", input->data);
  22. struct buffer *output = buffer_new();
  23. int size = buffer_readable_size(input);
  24. for (int i = 0; i < size; i++) {
  25. buffer_append_char(output, rot13_char(buffer_read_char(input)));
  26. }
  27. tcp_connection_send_buffer(tcpConnection, output);
  28. return 0;
  29. }
  30. //数据通过buffer写完之后的callback
  31. int onWriteCompleted(struct tcp_connection *tcpConnection) {
  32. printf("write completed\n");
  33. return 0;
  34. }
  35. //连接关闭之后的callback
  36. int onConnectionClosed(struct tcp_connection *tcpConnection) {
  37. printf("connection closed\n");
  38. return 0;
  39. }
  40. int main(int c, char **v) {
  41. //主线程event_loop
  42. struct event_loop *eventLoop = event_loop_init();
  43. //初始化acceptor
  44. struct acceptor *acceptor = acceptor_init(SERV_PORT);
  45. //初始tcp_server,可以指定线程数目,这里线程是4,说明是一个acceptor线程,4个I/O线程,没一个I/O线程
  46. //tcp_server自己带一个event_loop
  47. struct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,
  48. onWriteCompleted, onConnectionClosed, 4);
  49. tcp_server_start(tcpServer);
  50. // main thread for acceptor
  51. event_loop_run(eventLoop);
  52. }

样例程序结果

  1. $./poll-server-multithreads
  2. [msg] set poll as dispatcher
  3. [msg] add channel fd == 4, main thread
  4. [msg] poll added channel fd==4
  5. [msg] set poll as dispatcher
  6. [msg] add channel fd == 7, main thread
  7. [msg] poll added channel fd==7
  8. [msg] event loop thread init and signal, Thread-1
  9. [msg] event loop run, Thread-1
  10. [msg] event loop thread started, Thread-1
  11. [msg] set poll as dispatcher
  12. [msg] add channel fd == 9, main thread
  13. [msg] poll added channel fd==9
  14. [msg] event loop thread init and signal, Thread-2
  15. [msg] event loop run, Thread-2
  16. [msg] event loop thread started, Thread-2
  17. [msg] set poll as dispatcher
  18. [msg] add channel fd == 11, main thread
  19. [msg] poll added channel fd==11
  20. [msg] event loop thread init and signal, Thread-3
  21. [msg] event loop thread started, Thread-3
  22. [msg] set poll as dispatcher
  23. [msg] event loop run, Thread-3
  24. [msg] add channel fd == 13, main thread
  25. [msg] poll added channel fd==13
  26. [msg] event loop thread init and signal, Thread-4
  27. [msg] event loop run, Thread-4
  28. [msg] event loop thread started, Thread-4
  29. [msg] add channel fd == 5, main thread
  30. [msg] poll added channel fd==5
  31. [msg] event loop run, main thread
  32. [msg] get message channel i==1, fd==5
  33. [msg] activate channel fd == 5, revents=2, main thread
  34. [msg] new connection established, socket == 14
  35. connection completed
  36. [msg] get message channel i==0, fd==7
  37. [msg] activate channel fd == 7, revents=2, Thread-1
  38. [msg] wakeup, Thread-1
  39. [msg] add channel fd == 14, Thread-1
  40. [msg] poll added channel fd==14
  41. [msg] get message channel i==1, fd==14
  42. [msg] activate channel fd == 14, revents=2, Thread-1
  43. get message from tcp connection connection-14
  44. fasfas
  45. [msg] get message channel i==1, fd==14
  46. [msg] activate channel fd == 14, revents=2, Thread-1
  47. get message from tcp connection connection-14
  48. fasfas
  49. asfa
  50. [msg] get message channel i==1, fd==5
  51. [msg] activate channel fd == 5, revents=2, main thread
  52. [msg] new connection established, socket == 15
  53. connection completed
  54. [msg] get message channel i==0, fd==9
  55. [msg] activate channel fd == 9, revents=2, Thread-2
  56. [msg] wakeup, Thread-2
  57. [msg] add channel fd == 15, Thread-2
  58. [msg] poll added channel fd==15
  59. [msg] get message channel i==1, fd==15
  60. [msg] activate channel fd == 15, revents=2, Thread-2
  61. get message from tcp connection connection-15
  62. afasdfasf
  63. [msg] get message channel i==1, fd==15
  64. [msg] activate channel fd == 15, revents=2, Thread-2
  65. get message from tcp connection connection-15
  66. afasdfasf
  67. safsafa
  68. [msg] get message channel i==1, fd==15
  69. [msg] activate channel fd == 15, revents=2, Thread-2
  70. [msg] poll delete channel fd==15
  71. connection closed
  72. [msg] get message channel i==1, fd==5
  73. [msg] activate channel fd == 5, revents=2, main thread
  74. [msg] new connection established, socket == 16
  75. connection completed
  76. [msg] get message channel i==0, fd==11
  77. [msg] activate channel fd == 11, revents=2, Thread-3
  78. [msg] wakeup, Thread-3
  79. [msg] add channel fd == 16, Thread-3
  80. [msg] poll added channel fd==16
  81. [msg] get message channel i==1, fd==16
  82. [msg] activate channel fd == 16, revents=2, Thread-3
  83. get message from tcp connection connection-16
  84. fdasfasdf
  85. [msg] get message channel i==1, fd==14
  86. [msg] activate channel fd == 14, revents=2, Thread-1
  87. [msg] poll delete channel fd==14
  88. connection closed
  89. [msg] get message channel i==1, fd==5
  90. [msg] activate channel fd == 5, revents=2, main thread
  91. [msg] new connection established, socket == 17
  92. connection completed
  93. [msg] get message channel i==0, fd==13
  94. [msg] activate channel fd == 13, revents=2, Thread-4
  95. [msg] wakeup, Thread-4
  96. [msg] add channel fd == 17, Thread-4
  97. [msg] poll added channel fd==17
  98. [msg] get message channel i==1, fd==17
  99. [msg] activate channel fd == 17, revents=2, Thread-4
  100. get message from tcp connection connection-17
  101. qreqwrq
  102. [msg] get message channel i==1, fd==16
  103. [msg] activate channel fd == 16, revents=2, Thread-3
  104. [msg] poll delete channel fd==16
  105. connection closed
  106. [msg] get message channel i==1, fd==5
  107. [msg] activate channel fd == 5, revents=2, main thread
  108. [msg] new connection established, socket == 18
  109. connection completed
  110. [msg] get message channel i==0, fd==7
  111. [msg] activate channel fd == 7, revents=2, Thread-1
  112. [msg] wakeup, Thread-1
  113. [msg] add channel fd == 18, Thread-1
  114. [msg] poll added channel fd==18
  115. [msg] get message channel i==1, fd==18
  116. [msg] activate channel fd == 18, revents=2, Thread-1
  117. get message from tcp connection connection-18
  118. fasgasdg
  119. ^C