重温事件驱动

基于事件的程序设计: GUI、Web

事件驱动的好处是占用资源少,效率高,可扩展性强,是支持高性能高并发的不二之选。

  1. //按钮点击的事件处理
  2. void onButtonClick(){
  3. }

这个设计的思想是,一个无限循环的事件分发线程在后台运行,一旦用户在界面上产生了某种操作,例如点击了某个 Button,或者点击了某个文本框,一个事件会被产生并放置到事件队列中,这个事件会有一个类似前面的 onButtonClick 回调函数。事件分发线程的任务,就是为每个发生的事件找到对应的事件回调函数并执行它。这样,一个基于事件驱动的 GUI 程序就可以完美地工作了。

在第 24 讲中,我们已经提到,通过使用 poll、epoll 等 I/O 分发技术,可以设计出基于套接字的事件驱动程序,从而满足高性能、高并发的需求。

事件驱动模型,也被叫做反应堆模型(reactor),或者是 Event loop 模型。这个模型的核心有两点:

  • 它存在一个无限循环的事件分发线程,或者叫做 reactor 线程、Event loop 线程。这个事件分发线程的背后,就是 poll、epoll 等 I/O 分发技术的使用。
  • 所有的 I/O 操作都可以抽象成事件,每个事件必须有回调函数来处理
    • acceptor 上有连接建立成功、已连接套接字上发送缓冲区空出可以写、通信管道 pipe 上有数据可以读,这些都是一个个事件,通过事件分发,这些事件都可以一一被检测,并调用对应的回调函数加以处理。

几种 I/O 模型和线程模型设计

  1. read:从套接字收取数据;
  2. decode:对收到的数据进行解析;
  3. compute:根据解析之后的内容,进行计算和处理;
  4. encode:将处理之后的结果,按照约定的格式进行编码;
  5. send:最后,通过套接字把结果发送出去。

fork

随着客户数的变多,fork 的子进程也越来越多,即使客户和服务器之间的交互比较少,这样的子进程也不能被销毁,一直需要存在。使用 fork 的方式处理非常简单,它的缺点是处理效率不高,fork 子进程的开销太大。

image.png

pthread

每次创建一个线程的开销仍然是不小的,因此,引入了线程池的概念,预先创建出一个线程池,在每次新连接达到时,从线程池挑选出一个线程为之服务,很好地解决了线程创建的开销。但是,这个模式还是没有解决空闲连接占用资源的问题,如果一个连接在一定时间内没有数据交互,这个连接还是要占用一定的线程资源,直到这个连接消亡为止。
**
image.png

single reactor thread

一个 reactor 线程上同时负责分发 acceptor 的事件、已连接套接字的 I/O 事件。

image.png

single reactor thread + worker threads

将这些 decode、compute、enode 型工作放置到另外的线程池中,和反应堆线程解耦,是一个比较明智的选择。反应堆线程只负责处理 I/O 相关的工作,业务逻辑相关的工作都被裁剪成一个一个的小任务,放到线程池里由空闲的线程来执行。当结果完成后,再交给反应堆线程,由反应堆线程通过套接字将结果发送出去。

image.png

样例程序

#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

//连接建立之后的callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {
    printf("connection completed\n");
    return 0;
}

//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
    printf("get message from tcp connection %s\n", tcpConnection->name);
    printf("%s", input->data);

    struct buffer *output = buffer_new();
    int size = buffer_readable_size(input);
    for (int i = 0; i < size; i++) {
        buffer_append_char(output, rot13_char(buffer_read_char(input)));
    }
    tcp_connection_send_buffer(tcpConnection, output);
    return 0;
}

//数据通过buffer写完之后的callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {
    printf("write completed\n");
    return 0;
}

//连接关闭之后的callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {
    printf("connection closed\n");
    return 0;
}

int main(int c, char **v) {
    //主线程event_loop
    struct event_loop *eventLoop = event_loop_init();

    //初始化acceptor
    struct acceptor *acceptor = acceptor_init(SERV_PORT);

    //初始tcp_server,可以指定线程数目,如果线程是0,就只有一个线程,既负责acceptor,也负责I/O
    struct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,
                                                  onWriteCompleted, onConnectionClosed, 0);
    tcp_server_start(tcpServer);

    // main thread for acceptor
    event_loop_run(eventLoop);
}

样例程序结果

 $./poll-server-onethread
[msg] set poll as dispatcher
[msg] add channel fd == 4, main thread
[msg] poll added channel fd==4
[msg] add channel fd == 5, main thread
[msg] poll added channel fd==5
[msg] event loop run, main thread
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 6
connection completed
[msg] add channel fd == 6, main thread
[msg] poll added channel fd==6
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
get message from tcp connection connection-6
afadsfaf
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
get message from tcp connection connection-6
afadsfaf
fdafasf
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 7
connection completed
[msg] add channel fd == 7, main thread
[msg] poll added channel fd==7
[msg] get message channel i==3, fd==7
[msg] activate channel fd == 7, revents=2, main thread
get message from tcp connection connection-7
sfasggwqe
[msg] get message channel i==3, fd==7
[msg] activate channel fd == 7, revents=2, main thread
[msg] poll delete channel fd==7
connection closed
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
[msg] poll delete channel fd==6
connection closed