很多时候,除了响应事件之外,应用还希望做一定的数据缓冲。比如说,写入数据的时候 ,通常的运行模式是:
- 决定要向连接写入一些数据,把数据放入到缓冲区中
- 等待连接可以写入
- 写入尽量多的数据
- 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入
这种缓冲 IO 模式很通用,libevent 为此提供了一种通用机制,即bufferevent。
bufferevent 由一个底层的传输端口(如套接字 ),一个读取缓冲区和一个写入缓冲区组成。与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent 在读取或者写入了足够量的数据之后调用用户提供的回调。
有多种共享公用接口的 bufferevent 类型,编写本文时已存在以下类型:
基于套接字的 bufferevent
:使用 event_*接口作为后端,通过底层流式套接字发送或者接收数据的 bufferevent异步 IO bufferevent
:使用 Windows IOCP 接口,通过底层流式套接字发送或者接收数据的 bufferevent(仅用于 Windows,试验中)过滤型 bufferevent
:将数据传输到底层 bufferevent 对象之前,处理输入或者输出数据的 bufferevent:比如说,为了压缩或者转换数据。成对的 bufferevent
:相互传输数据的两个 bufferevent。
注意
:截止2.0.2-alpha 版,这里列出的 bufferevent 接口还没有完全正交于所有 的 bufferevent 类型。也就是说,下面将要介绍的接口不是都能用于所有bufferevent 类型。libevent 开发 者在未来版本中将修正这个问题。
也请注意
:当前 bufferevent 只能用于像 TCP 这样的面向流的协议,将来才可能会支持 像 UDP 这样的面向数据报的协议。
bufferevent和evbuffer
每个 bufferevent 都有一个输入缓冲区和一个输出缓冲区 ,它们的类型都是“struct evbuffer”。 有数据要写入到 bufferevent 时,添加数据到输出缓冲区 ;bufferevent 中有数据供读取的时候,从输入缓冲区抽取(drain)数据。
evbuffer 接口支持很多种操作,后面的章节将讨论这些操作。
7.1 回调和水位
每个 bufferevent 有两个数据相关的回调:一个读取回调和一个写入回调。默认情况下,从底层传输端口读取了任意量的数据之后会调用读取回调 ;输出缓冲区中足够量的数据被清空到底层传输端口后写入回调会被调用。通过调整 bufferevent 的读取和写入 “水位 (watermarks )”可以覆盖这些函数的默认行为。
每个 bufferevent 有四个水位:
- 读取低水位 :读取操作使得输入缓冲区的数据量在此级别或者更高时 ,读取回调将被调用。默认值为 0,所以每个读取操作都会导致读取回调被调用。
- 读取高水位 :输入缓冲区中的数据量达到此级别后, bufferevent 将停止读取,直到输入缓冲区中足够量的数据被抽取 ,使得数据量低于此级别 。默认值是无限 ,所以永远不会因为输入缓冲区的大小而停止读取。
- 写入低水位 :写入操作使得输出缓冲区的数据量达到或者低于此级别时 ,写入回调将被调用。默认值是 0,所以只有输出缓冲区空的时候才会调用写入回调。
- 写入高水位 :bufferevent 没有直接使用这个水位。它在 bufferevent 用作另外一 个 bufferevent 的底层传输端口时有特殊意义。请看后面关于过滤型 bufferevent 的介绍。
7.2 延迟回调
默认情况下,bufferevent 的回调在相应的条件发生时立即被执行 。(evbuffer 的回调也是这样的,随后会介绍)在依赖关系复杂的情况下 ,这种立即调用会制造麻烦 。比如说,假如某个回调在 evbuffer A 空的时候向其中移入数据 ,而另一个回调在 evbuffer A 满的时候从中取出数据。这些调用都是在栈上发生的,在依赖关系足够复杂的时候,有栈溢出的风险。
要解决此问题,可以请求 bufferevent(或者 evbuffer)延迟其回调。条件满足时,延迟回调不会立即调用,而是在 event_loop()调用中被排队,然后在通常的事件回调之后执行 。
(延迟回调由 libevent 2.0.1-alpha 版引入)
7.3 bufferevent 选项标志
创建 bufferevent 时可以使用一个或者多个标志修改其行为。可识别的标志有:
- BEV_OPT_CLOSE_ON_FREE :释放 bufferevent 时关闭底层传输端口。这将关闭底层套接字,释放底层 bufferevent 等。
- BEV_OPT_THREADSAFE :自动为 bufferevent 分配锁,这样就可以安全地在多个线程中使用 bufferevent。
- BEV_OPT_DEFER_CALLBACKS :设置这个标志时, bufferevent 延迟所有回调,如上所述。
- BEV_OPT_UNLOCK_CALLBACKS :默认情况下,如果设置 bufferevent 为线程安全 的,则 bufferevent 会在调用用户提供的回调时进行锁定。设置这个选项会让 libevent 在执行回调的时候不进行锁定。
(BEV_OPT_UNLOCK_CALLBACKS 由2.0.5-beta 版引入,其他选项由 2.0.1-alpha 版引 入)
7.4 使用bufferevent
基于套接字的 bufferevent 是最简单的,它使用 libevent 的底层事件机制来检测底层网络套 接字是否已经就绪,可以进行读写操作,并且使用底层网络调用(如 readv 、 writev 、 WSASend、WSARecv)来发送和接收数据。
7.4.1 创建基于套接字的bufferevent
可以使用 bufferevent_socket_new()创建基于套接字的 bufferevent。
struct bufferevent *bufferevent_socket_new(
struct event_base *base,
evutil_socket_t fd,
enum bufferevent_options options);
base 是 event_base,options 是表示 bufferevent 选项(BEV_OPT_CLOSE_ON_FREE 等) 的位掩码, fd是一个可选的表示套接字的文件描述符。如果想以后设置文件描述符,可以设置fd为-1。
成功时函数返回一个 bufferevent,失败则返回 NULL。
7.4.2 在bufferevent上启动链接
int bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *address, int addrlen);
address 和 addrlen 参数跟标准调用 connect()的参数相同。如果还没有为 bufferevent 设置套接字,调用函数将为其分配一个新的流套接字,并且设置为非阻塞的。
如果已经为 bufferevent 设置套接字,调用bufferevent_socket_connect() 将告知 libevent 套接字还未连接,直到连接成功之前不应该对其进行读取或者写入操作。
连接完成之前可以向输出缓冲区添加数据。
如果连接成功启动,函数返回 0;如果发生错误则返回 -1。
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
/* We're connected to 127.0.0.1:8080. Ordinarily we'd do
something here, like start reading or writing. */
} else if (events & BEV_EVENT_ERROR) {
/* An error occured while connecting. */
}
}
int main_loop(void)
{
struct event_base *base;
struct bufferevent *bev;
struct sockaddr_in sin;
base = event_base_new();
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sin.sin_port = htons(8080); /* Port 8080 */
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
bufferevent_free(bev);
return -1;
}
event_base_dispatch(base);
return 0;
}
**注意**
:如果使用 bufferevent_socket_connect() 发起连接,将只会收 到 BEV_EVENT_CONNECTED 事件。如果自己调用 connect(),则连接上将被报告为写入事件。
7.5 使用bufferevent操作
7.5.1 释放bufferevent操作
void bufferevent_free(struct bufferevent *bev);
这个函数释放 bufferevent。bufferevent 内部具有引用计数,所以,如果释放 时还有未决的延迟回调,则在回调完成之前 bufferevent 不会被删除。
如果设置了 BEV_OPT_CLOSE_ON_FREE 标志,并且 bufferevent 有一个套接字或者底层 bufferevent 作为其传输端口,则释放 bufferevent 将关闭这个传输端口。
7.5.2 操作回调、水位和启用/禁用
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,
short events, void *ctx);
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
void bufferevent_getcb(struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr);
bufferevent_setcb()函数修改 bufferevent 的一个或者多个回调 。readcb、writecb和eventcb函数将分别在已经读取足够的数据 、已经写入足够的数据 ,或者发生错误时被调用 。
每个回调函数的第一个参数都是发生了事件的bufferevent ,最后一个参数都是调用bufferevent_setcb()时用户提供的 cbarg 参数:可以通过它向回调传递数据。事件回调 的 events 参数是一个表示事件标志的位掩码:请看前面的 “回调和水位”节。
要禁用回调,传递 NULL 而不是回调函数 。注意:bufferevent 的所有回调函数共享单 个 cbarg, 所以修改它将影响所有回调函数。
接口
void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);
short bufferevent_get_enabled(struct bufferevent *bufev);
可以启用或者禁用 bufferevent 上的 EV_READ、EV_WRITE 或者 EV_READ | EV_WRITE 事件。没有启用读取或者写入事件时, bufferevent 将不会试图进行数据读取或者写入。
没有必要在输出缓冲区空时禁用写入事件: bufferevent 将自动停止写入,然后在有数据等 待写入时重新开始。
类似地,没有必要在输入缓冲区高于高水位时禁用读取事件 :bufferevent 将自动停止读取, 然后在有空间用于读取时重新开始读取。
默认情况下,新创建的 bufferevent 的写入是启用的,但是读取没有启用。
可以调用 bufferevent_get_enabled()确定 bufferevent 上当前启用的事件。
接口
void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark);
bufferevent_setwatermark()函数调整单个 bufferevent 的读取水位、写入水位,或者同时调 整二者。(如果 events 参数设置了 EV_READ,调整读取水位。如果 events 设置了 EV_WRITE 标志,调整写入水位)
对于高水位,0表示“无限”。
示例
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
struct info {
const char *name;
size_t total_drained;
};
void read_callback(struct bufferevent *bev, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
if (len) {
inf->total_drained += len;
evbuffer_drain(input, len);
printf("Drained %lu bytes from %s\n",
(unsigned long) len, inf->name);
}
}
void event_callback(struct bufferevent *bev, short events, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
int finished = 0;
if (events & BEV_EVENT_EOF) {
size_t len = evbuffer_get_length(input);
printf("Got a close from %s. We drained %lu bytes from it, "
"and have %lu left.\n", inf->name,
(unsigned long)inf->total_drained, (unsigned long)len);
finished = 1;
}
if (events & BEV_EVENT_ERROR) {
printf("Got an error from %s: %s\n",
inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
finished = 1;
}
if (finished) {
free(ctx);
bufferevent_free(bev);
}
}
struct bufferevent *setup_bufferevent(void)
{
struct bufferevent *b1 = NULL;
struct info *info1;
info1 = malloc(sizeof(struct info));
info1->name = "buffer 1";
info1->total_drained = 0;
/* ... Here we should set up the bufferevent and make sure it gets
connected... */
/* Trigger the read callback only whenever there is at least 128 bytes
of data in the buffer. */
bufferevent_setwatermark(b1, EV_READ, 128, 0);
bufferevent_setcb(b1, read_callback, NULL, event_callback, info1);
bufferevent_enable(b1, EV_READ); /* Start reading. */
return b1;
}
7.5.3 操作bufferevent中的数据
通过bufferevent得到evbuffer
如果只是通过网络读取或者写入数据 ,而不能观察操作过程,是没什么好处的。bufferevent 提供了下列函数用于观察要写入或者读取的数据。
struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
这两个函数提供了非常强大的基础 :它们分别返回输入和输出缓冲区 。关于可以对 evbuffer 类型进行的所有操作的完整信息,请看下一章。
如果写入操作因为数据量太少而停止(或者读取操作因为太多数据而停止 ),则向输出缓冲 区添加数据(或者从输入缓冲区移除数据)将自动重启操作。
向bufferevent的输出缓冲区添加数据
int bufferevent_write(struct bufferevent *bufev,
const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
这些函数向 bufferevent 的输出缓冲区添加数据。 bufferevent_write()将内存中从 data 处开 始的 size 字节数据添加到输出缓冲区的末尾 。bufferevent_write_buffer()移除 buf 的所有内 容,将其放置到输出缓冲区的末尾。成功时这些函数都返回 0,发生错误时则返回-1。
从bufferevent的输入缓冲区移除数据
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
这些函数从 bufferevent 的输入缓冲区移除数据。bufferevent_read()至多从输入缓冲区移除 size 字节的数据,将其存储到内存中 data 处。函数返回实际移除的字节数。 bufferevent_read_buffer()函数抽空输入缓冲区的所有内容,将其放置到 buf 中,成功时返 回0,失败时返回 -1。
注意,对于 bufferevent_read(),data 处的内存块必须有足够的空间容纳 size 字节数据。
示例
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <ctype.h>
void
read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
/* This callback removes the data from bev's input buffer 128
bytes at a time, uppercases it, and starts sending it
back.
(Watch out! In practice, you shouldn't use toupper to implement
a network protocol, unless you know for a fact that the current
locale is the one you want to be using.)
*/
char tmp[128];
size_t n;
int i;
while (1) {
n = bufferevent_read(bev, tmp, sizeof(tmp));
if (n <= 0)
break; /* No more data. */
for (i=0; i<n; ++i)
tmp[i] = toupper(tmp[i]);
bufferevent_write(bev, tmp, n);
}
}
struct proxy_info {
struct bufferevent *other_bev;
};
void
read_callback_proxy(struct bufferevent *bev, void *ctx)
{
/* You might use a function like this if you're implementing
a simple proxy: it will take data from one connection (on
bev), and write it to another, copying as little as
possible. */
struct proxy_info *inf = ctx;
bufferevent_read_buffer(bev,
bufferevent_get_output(inf->other_bev));
}
struct count {
unsigned long last_fib[2];
};
void
write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
/* Here's a callback that adds some Fibonacci numbers to the
output buffer of bev. It stops once we have added 1k of
data; once this data is drained, we'll add more. */
struct count *c = ctx;
struct evbuffer *tmp = evbuffer_new();
while (evbuffer_get_length(tmp) < 1024) {
unsigned long next = c->last_fib[0] + c->last_fib[1];
c->last_fib[0] = c->last_fib[1];
c->last_fib[1] = next;
evbuffer_add_printf(tmp, "%lu", next);
}
/* Now we add the whole contents of tmp to bev. */
bufferevent_write_buffer(bev, tmp);
/* We don't need tmp any longer. */
evbuffer_free(tmp);
}
7.5.4 bufferevent的清空操作
int bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode state);
清空 bufferevent 要求 bufferevent 强制从底层传输端口读取或者写入尽可能多的数据 ,而忽略其他可能保持数据不被写入的限制条件 。函数的细节功能依赖于 bufferevent 的具体类型。
otype 参数应该是 EV_READ、EV_WRITE 或者 EV_READ | EV_WRITE,用于指示应该处 理读取、写入,还是二者都处理。 state 参数可以是 BEV_NORMAL、BEV_FLUSH 或者 BEV_FINISHED。BEV_FINISHED 指示应该告知另一端,没有更多数据需要发送了; 而 BEV_NORMAL 和 BEV_FLUSH 的区别依赖于具体的 bufferevent 类型。
失败时 bufferevent_flush()返回-1,如果没有数据被清空则返回 0,有数据被清空则返回 1。