这一讲我们就将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离,形成所谓的主 - 从 reactor 模式。
主 - 从 reactor 模式

主 - 从这个模式的核心思想是,主反应堆线程只负责分发 Acceptor 连接建立,已连接套接字上的 I/O 事件交给 sub-reactor 负责分发。其中 sub-reactor 的数量,可以根据 CPU 的核数来灵活设置。
我们的主反应堆线程一直在感知连接建立的事件,如果有连接成功建立,主反应堆线程通过 accept 方法获取已连接套接字,接下来会按照一定的算法选取一个从反应堆线程,并把已连接套接字加入到选择好的从反应堆线程中。
问题:
- 主反应堆线程和从反应堆线程,是两个不同的线程,如何把已连接套接字加入到另外一个线程中呢?
主 - 从 reactor+worker threads 模式
如果说主 - 从 reactor 模式解决了 I/O 分发的高效率问题,那么 work threads 就解决了业务逻辑和 I/O 分发之间的耦合问题。
**
把这两个策略组装在一起,就是实战中普遍采用的模式。大名鼎鼎的 Netty,就是把这种模式发挥到极致的一种实现。不过要注意 Netty 里面提到的 worker 线程,其实就是我们这里说的从 reactor 线程,并不是处理具体业务逻辑的 worker 线程。

样例程序
#include <lib/acceptor.h>#include "lib/common.h"#include "lib/event_loop.h"#include "lib/tcp_server.h"char rot13_char(char c) {if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;}//连接建立之后的callbackint onConnectionCompleted(struct tcp_connection *tcpConnection) {printf("connection completed\n");return 0;}//数据读到buffer之后的callbackint onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {printf("get message from tcp connection %s\n", tcpConnection->name);printf("%s", input->data);struct buffer *output = buffer_new();int size = buffer_readable_size(input);for (int i = 0; i < size; i++) {buffer_append_char(output, rot13_char(buffer_read_char(input)));}tcp_connection_send_buffer(tcpConnection, output);return 0;}//数据通过buffer写完之后的callbackint onWriteCompleted(struct tcp_connection *tcpConnection) {printf("write completed\n");return 0;}//连接关闭之后的callbackint onConnectionClosed(struct tcp_connection *tcpConnection) {printf("connection closed\n");return 0;}int main(int c, char **v) {//主线程event_loopstruct event_loop *eventLoop = event_loop_init();//初始化acceptorstruct acceptor *acceptor = acceptor_init(SERV_PORT);//初始tcp_server,可以指定线程数目,这里线程是4,说明是一个acceptor线程,4个I/O线程,没一个I/O线程//tcp_server自己带一个event_loopstruct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,onWriteCompleted, onConnectionClosed, 4);tcp_server_start(tcpServer);// main thread for acceptorevent_loop_run(eventLoop);}
样例程序结果
$./poll-server-multithreads[msg] set poll as dispatcher[msg] add channel fd == 4, main thread[msg] poll added channel fd==4[msg] set poll as dispatcher[msg] add channel fd == 7, main thread[msg] poll added channel fd==7[msg] event loop thread init and signal, Thread-1[msg] event loop run, Thread-1[msg] event loop thread started, Thread-1[msg] set poll as dispatcher[msg] add channel fd == 9, main thread[msg] poll added channel fd==9[msg] event loop thread init and signal, Thread-2[msg] event loop run, Thread-2[msg] event loop thread started, Thread-2[msg] set poll as dispatcher[msg] add channel fd == 11, main thread[msg] poll added channel fd==11[msg] event loop thread init and signal, Thread-3[msg] event loop thread started, Thread-3[msg] set poll as dispatcher[msg] event loop run, Thread-3[msg] add channel fd == 13, main thread[msg] poll added channel fd==13[msg] event loop thread init and signal, Thread-4[msg] event loop run, Thread-4[msg] event loop thread started, Thread-4[msg] add channel fd == 5, main thread[msg] poll added channel fd==5[msg] event loop run, main thread[msg] get message channel i==1, fd==5[msg] activate channel fd == 5, revents=2, main thread[msg] new connection established, socket == 14connection completed[msg] get message channel i==0, fd==7[msg] activate channel fd == 7, revents=2, Thread-1[msg] wakeup, Thread-1[msg] add channel fd == 14, Thread-1[msg] poll added channel fd==14[msg] get message channel i==1, fd==14[msg] activate channel fd == 14, revents=2, Thread-1get message from tcp connection connection-14fasfas[msg] get message channel i==1, fd==14[msg] activate channel fd == 14, revents=2, Thread-1get message from tcp connection connection-14fasfasasfa[msg] get message channel i==1, fd==5[msg] activate channel fd == 5, revents=2, main thread[msg] new connection established, socket == 15connection completed[msg] get message channel i==0, fd==9[msg] activate channel fd == 9, revents=2, Thread-2[msg] wakeup, Thread-2[msg] add channel fd == 15, Thread-2[msg] poll added channel fd==15[msg] get message channel i==1, fd==15[msg] activate channel fd == 15, revents=2, Thread-2get message from tcp connection connection-15afasdfasf[msg] get message channel i==1, fd==15[msg] activate channel fd == 15, revents=2, Thread-2get message from tcp connection connection-15afasdfasfsafsafa[msg] get message channel i==1, fd==15[msg] activate channel fd == 15, revents=2, Thread-2[msg] poll delete channel fd==15connection closed[msg] get message channel i==1, fd==5[msg] activate channel fd == 5, revents=2, main thread[msg] new connection established, socket == 16connection completed[msg] get message channel i==0, fd==11[msg] activate channel fd == 11, revents=2, Thread-3[msg] wakeup, Thread-3[msg] add channel fd == 16, Thread-3[msg] poll added channel fd==16[msg] get message channel i==1, fd==16[msg] activate channel fd == 16, revents=2, Thread-3get message from tcp connection connection-16fdasfasdf[msg] get message channel i==1, fd==14[msg] activate channel fd == 14, revents=2, Thread-1[msg] poll delete channel fd==14connection closed[msg] get message channel i==1, fd==5[msg] activate channel fd == 5, revents=2, main thread[msg] new connection established, socket == 17connection completed[msg] get message channel i==0, fd==13[msg] activate channel fd == 13, revents=2, Thread-4[msg] wakeup, Thread-4[msg] add channel fd == 17, Thread-4[msg] poll added channel fd==17[msg] get message channel i==1, fd==17[msg] activate channel fd == 17, revents=2, Thread-4get message from tcp connection connection-17qreqwrq[msg] get message channel i==1, fd==16[msg] activate channel fd == 16, revents=2, Thread-3[msg] poll delete channel fd==16connection closed[msg] get message channel i==1, fd==5[msg] activate channel fd == 5, revents=2, main thread[msg] new connection established, socket == 18connection completed[msg] get message channel i==0, fd==7[msg] activate channel fd == 7, revents=2, Thread-1[msg] wakeup, Thread-1[msg] add channel fd == 18, Thread-1[msg] poll added channel fd==18[msg] get message channel i==1, fd==18[msg] activate channel fd == 18, revents=2, Thread-1get message from tcp connection connection-18fasgasdg^C
