本章介绍的工具和技术对于常见的任务非常的实用。libuv吸收了libev用户手册页中所涵盖的一些模式,并在此基础上对API做了少许的改动。本章还包含了一些无需用完整的一章来介绍的libuv API。

Timers

在定时器启动后的特定时间后,定时器会调用回调函数。libuv的定时器还可以设定为,按时间间隔定时启动,而不是只启动一次。
可以简单地使用超时时间timeout作为参数初始化一个定时器,还有一个可选参数repeat。定时器能在任何时间被终止。

  1. uv_timer_t timer_req;
  2. uv_timer_init(loop, &timer_req);
  3. uv_timer_start(&timer_req, callback, 5000, 2000);

上述操作会启动一个循环定时器(repeating timer),它会在调用uv_timer_start后,5秒(timeout)启动回调函数,然后每隔2秒(repeat)循环启动回调函数。你可以使用:

  1. uv_timer_stop(&timer_req);

来停止定时器。这个函数也可以在回调函数中安全地使用。

循环的间隔也可以随时定义,使用:

  1. uv_timer_set_repeat(uv_timer_t *timer, int64_t repeat);

它会在可能的时候发挥作用。如果上述函数是在定时器回调函数中调用的,这意味着:

  • 如果定时器未设置为循环,这意味着定时器已经停止。需要先用uv_timer_start重新启动。
  • 如果定时器被设置为循环,那么下一次超时的时间已经被规划好了,所以在切换到新的间隔之前,旧的间隔还会发挥一次作用。

函数:

  1. int uv_timer_again(uv_timer_t *)

只适用于循环定时器,相当于停止定时器,然后把原先的timeoutrepeat值都设置为之前的repeat值,启动定时器。如果当该函数调用时,定时器未启动,则调用失败(错误码为UV_EINVAL)并且返回-1。

下面的一节会出现使用定时器的例子。

Event loop reference count

event-loop在没有了活跃的handle之后,便会终止。整套系统的工作方式是:在handle增加时,event-loop的引用计数加1,在handle停止时,引用计数减少1。当然,libuv也允许手动地更改引用计数,通过使用:

  1. void uv_ref(uv_handle_t*);
  2. void uv_unref(uv_handle_t*);

这样,就可以达到允许loop即使在有正在活动的定时器时,仍然能够推出。或者是使用自定义的uv_handle_t对象来使得loop保持工作。

第二个函数可以和间隔循环定时器结合使用。你会有一个每隔x秒执行一次的垃圾回收器,或者是你的网络服务器会每隔一段时间向其他人发送一次心跳信号,但是你不想只有在所有垃圾回收完或者出现错误时才能停止他们。如果你想要在你其他的监视器都退出后,终止程序。这时你就可以立即unref定时器,即便定时器这时是loop上唯一还在运行的监视器,你依旧可以停止uv_run()

它们同样会出现在node.js中,如js的API中封装的libuv方法。每一个js的对象产生一个uv_handle_t(所有监视器的超类),同样可以被uv_ref和uv_unref。

ref-timer/main.c

  1. uv_loop_t *loop;
  2. uv_timer_t gc_req;
  3. uv_timer_t fake_job_req;
  4. int main() {
  5. loop = uv_default_loop();
  6. uv_timer_init(loop, &gc_req);
  7. uv_unref((uv_handle_t*) &gc_req);
  8. uv_timer_start(&gc_req, gc, 0, 2000);
  9. // could actually be a TCP download or something
  10. uv_timer_init(loop, &fake_job_req);
  11. uv_timer_start(&fake_job_req, fake_job, 9000, 0);
  12. return uv_run(loop, UV_RUN_DEFAULT);
  13. }

首先初始化垃圾回收器的定时器,然后在立刻unref它。注意观察9秒之后,此时fake_job完成,程序会自动退出,即使垃圾回收器还在运行。

Idler pattern

空转的回调函数会在每一次的event-loop循环激发一次。空转的回调函数可以用来执行一些优先级较低的活动。比如,你可以向开发者发送应用程序的每日性能表现情况,以便于分析,或者是使用用户应用cpu时间来做SETI运算:)。空转程序还可以用于GUI应用。比如你在使用event-loop来下载文件,如果tcp连接未中断而且当前并没有其他的事件,则你的event-loop会阻塞,这也就意味着你的下载进度条会停滞,用户会面对一个无响应的程序。面对这种情况,空转监视器可以保持UI可操作。

idle-compute/main.c

  1. uv_loop_t *loop;
  2. uv_fs_t stdin_watcher;
  3. uv_idle_t idler;
  4. char buffer[1024];
  5. int main() {
  6. loop = uv_default_loop();
  7. uv_idle_init(loop, &idler);
  8. uv_buf_t buf = uv_buf_init(buffer, 1024);
  9. uv_fs_read(loop, &stdin_watcher, 0, &buf, 1, -1, on_type);
  10. uv_idle_start(&idler, crunch_away);
  11. return uv_run(loop, UV_RUN_DEFAULT);
  12. }

上述程序中,我们将空转监视器和我们真正关心的事件排在一起。crunch_away会被循环地调用,直到输入字符并回车。然后程序会被中断很短的时间,用来处理数据读取,然后在接着调用空转的回调函数。

idle-compute/main.c

  1. void crunch_away(uv_idle_t* handle) {
  2. // Compute extra-terrestrial life
  3. // fold proteins
  4. // computer another digit of PI
  5. // or similar
  6. fprintf(stderr, "Computing PI...\n");
  7. // just to avoid overwhelming your terminal emulator
  8. uv_idle_stop(handle);
  9. }

Passing data to worker thread

在使用uv_queue_work的时候,你通常需要给工作线程传递复杂的数据。解决方案是自定义struct,然后使用uv_work_t.data指向它。一个稍微的不同是必须让uv_work_t作为这个自定义struct的成员之一(把这叫做接力棒)。这么做就可以使得,同时回收数据和uv_wortk_t

  1. struct ftp_baton {
  2. uv_work_t req;
  3. char *host;
  4. int port;
  5. char *username;
  6. char *password;
  7. }
  1. ftp_baton *baton = (ftp_baton*) malloc(sizeof(ftp_baton));
  2. baton->req.data = (void*) baton;
  3. baton->host = strdup("my.webhost.com");
  4. baton->port = 21;
  5. // ...
  6. uv_queue_work(loop, &baton->req, ftp_session, ftp_cleanup);

现在我们创建完了接力棒,并把它排入了队列中。

现在就可以随性所欲地获取自己想要的数据啦。

  1. void ftp_session(uv_work_t *req) {
  2. ftp_baton *baton = (ftp_baton*) req->data;
  3. fprintf(stderr, "Connecting to %s\n", baton->host);
  4. }
  5. void ftp_cleanup(uv_work_t *req) {
  6. ftp_baton *baton = (ftp_baton*) req->data;
  7. free(baton->host);
  8. // ...
  9. free(baton);
  10. }

我们既回收了接力棒,同时也回收了监视器。

External I/O with polling

通常在使用第三方库的时候,需要应对他们自己的IO,还有保持监视他们的socket和内部文件。在此情形下,不可能使用标准的IO流操作,但第三方库仍然能整合进event-loop中。所有这些需要的就是,第三方库就必须允许你访问它的底层文件描述符,并且提供可以处理有用户定义的细微任务的函数。但是一些第三库并不允许你这么做,他们只提供了一个标准的阻塞IO函数,此函数会完成所有的工作并返回。在event-loop的线程直接使用它们是不明智的,而是应该使用libuv的工作线程。当然,这也意味着失去了对第三方库的颗粒化控制。

libuv的uv_poll简单地监视了使用了操作系统的监控机制的文件描述符。从某方面说,libuv实现的所有的IO操作,的背后均有uv_poll的支持。无论操作系统何时监视到文件描述符的改变,libuv都会调用响应的回调函数。

现在我们简单地实现一个下载管理程序,它会通过libcurl来下载文件。我们不会直接控制libcurl,而是使用libuv的event-loop,通过非阻塞的异步的多重接口来处理下载,与此同时,libuv会监控IO的就绪状态。

uvwget/main.c - The setup

  1. #include <assert.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <uv.h>
  5. #include <curl/curl.h>
  6. uv_loop_t *loop;
  7. CURLM *curl_handle;
  8. uv_timer_t timeout;
  9. }
  10. int main(int argc, char **argv) {
  11. loop = uv_default_loop();
  12. if (argc <= 1)
  13. return 0;
  14. if (curl_global_init(CURL_GLOBAL_ALL)) {
  15. fprintf(stderr, "Could not init cURL\n");
  16. return 1;
  17. }
  18. uv_timer_init(loop, &timeout);
  19. curl_handle = curl_multi_init();
  20. curl_multi_setopt(curl_handle, CURLMOPT_SOCKETFUNCTION, handle_socket);
  21. curl_multi_setopt(curl_handle, CURLMOPT_TIMERFUNCTION, start_timeout);
  22. while (argc-- > 1) {
  23. add_download(argv[argc], argc);
  24. }
  25. uv_run(loop, UV_RUN_DEFAULT);
  26. curl_multi_cleanup(curl_handle);
  27. return 0;
  28. }

每种库整合进libuv的方式都是不同的。以libcurl的例子来说,我们注册了两个回调函数。socket回调函数handle_socket会在socket状态改变的时候被触发,因此我们不得不开始轮询它。start_timeout是libcurl用来告知我们下一次的超时间隔的,之后我们就应该不管当前IO状态,驱动libcurl向前。这些也就是libcurl能处理错误或驱动下载进度向前的原因。

可以这么调用下载器:

  1. $ ./uvwget [url1] [url2] ...

我们可以把url当成参数传入程序。

uvwget/main.c - Adding urls

  1. void add_download(const char *url, int num) {
  2. char filename[50];
  3. sprintf(filename, "%d.download", num);
  4. FILE *file;
  5. file = fopen(filename, "w");
  6. if (file == NULL) {
  7. fprintf(stderr, "Error opening %s\n", filename);
  8. return;
  9. }
  10. CURL *handle = curl_easy_init();
  11. curl_easy_setopt(handle, CURLOPT_WRITEDATA, file);
  12. curl_easy_setopt(handle, CURLOPT_URL, url);
  13. curl_multi_add_handle(curl_handle, handle);
  14. fprintf(stderr, "Added download %s -> %s\n", url, filename);
  15. }

我们允许libcurl直接向文件写入数据。

start_timeout会被libcurl立即调用。它会启动一个libuv的定时器,使用CURL_SOCKET_TIMEOUT驱动curl_multi_socket_action,当其超时时,调用它。curl_multi_socket_action会驱动libcurl,也会在socket状态改变的时候被调用。但在我们深入讲解它之前,我们需要轮询监听socket,等待handle_socket被调用。

uvwget/main.c - Setting up polling

  1. void start_timeout(CURLM *multi, long timeout_ms, void *userp) {
  2. if (timeout_ms <= 0)
  3. timeout_ms = 1; /* 0 means directly call socket_action, but we'll do it in a bit */
  4. uv_timer_start(&timeout, on_timeout, timeout_ms, 0);
  5. }
  6. int handle_socket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
  7. curl_context_t *curl_context;
  8. if (action == CURL_POLL_IN || action == CURL_POLL_OUT) {
  9. if (socketp) {
  10. curl_context = (curl_context_t*) socketp;
  11. }
  12. else {
  13. curl_context = create_curl_context(s);
  14. curl_multi_assign(curl_handle, s, (void *) curl_context);
  15. }
  16. }
  17. switch (action) {
  18. case CURL_POLL_IN:
  19. uv_poll_start(&curl_context->poll_handle, UV_READABLE, curl_perform);
  20. break;
  21. case CURL_POLL_OUT:
  22. uv_poll_start(&curl_context->poll_handle, UV_WRITABLE, curl_perform);
  23. break;
  24. case CURL_POLL_REMOVE:
  25. if (socketp) {
  26. uv_poll_stop(&((curl_context_t*)socketp)->poll_handle);
  27. destroy_curl_context((curl_context_t*) socketp);
  28. curl_multi_assign(curl_handle, s, NULL);
  29. }
  30. break;
  31. default:
  32. abort();
  33. }
  34. return 0;
  35. }

我们关心的是socket的文件描述符s,还有action。对应每一个socket,我们都创造了uv_poll_t,并用curl_multi_assign把它们关联起来。每当回调函数被调用时,socketp都会指向它。

在下载完成或失败后,libcurl需要移除poll。所以我们停止并回收了poll的handle。

我们使用UV_READABLEUV_WRITABLE开始轮询,基于libcurl想要监视的事件。当socket已经准备好读或写后,libuv会调用轮询的回调函数。在相同的handle上调用多次uv_poll_start是被允许的,这么做可以更新事件的参数。curl_perform是整个程序的关键。

uvwget/main.c - Driving libcurl.

  1. void curl_perform(uv_poll_t *req, int status, int events) {
  2. uv_timer_stop(&timeout);
  3. int running_handles;
  4. int flags = 0;
  5. if (status < 0) flags = CURL_CSELECT_ERR;
  6. if (!status && events & UV_READABLE) flags |= CURL_CSELECT_IN;
  7. if (!status && events & UV_WRITABLE) flags |= CURL_CSELECT_OUT;
  8. curl_context_t *context;
  9. context = (curl_context_t*)req;
  10. curl_multi_socket_action(curl_handle, context->sockfd, flags, &running_handles);
  11. check_multi_info();
  12. }

首先我们要做的是停止定时器,因为内部还有其他要做的事。接下来我们我们依据触发回调函数的事件,来设置flag。然后,我们使用上述socket和flag作为参数,来调用curl_multi_socket_action。在此刻libcurl会在内部完成所有的工作,然后尽快地返回事件驱动程序在主线程中急需的数据。libcurl会在自己的队列中将传输进度的消息排队。对于我们来说,我们只关心是否传输完成,这类消息。所以我们将这类消息提取出来,并将传输完成的handle回收。

uvwget/main.c - Reading transfer status.

  1. void check_multi_info(void) {
  2. char *done_url;
  3. CURLMsg *message;
  4. int pending;
  5. while ((message = curl_multi_info_read(curl_handle, &pending))) {
  6. switch (message->msg) {
  7. case CURLMSG_DONE:
  8. curl_easy_getinfo(message->easy_handle, CURLINFO_EFFECTIVE_URL,
  9. &done_url);
  10. printf("%s DONE\n", done_url);
  11. curl_multi_remove_handle(curl_handle, message->easy_handle);
  12. curl_easy_cleanup(message->easy_handle);
  13. break;
  14. default:
  15. fprintf(stderr, "CURLMSG default\n");
  16. abort();
  17. }
  18. }
  19. }

Loading libraries

libuv提供了一个跨平台的API来加载共享库shared libraries。这就可以用来实现你自己的插件/扩展/模块系统,它们可以被nodejs通过require()调用。只要你的库输出的是正确的符号,用起来还是很简单的。在载入第三方库的时候,要注意错误和安全检查,否则你的程序就会表现出不可预测的行为。下面这个例子实现了一个简单的插件,它只是打印出了自己的名字。

首先看下提供给插件作者的接口。

plugin/plugin.h

  1. #ifndef UVBOOK_PLUGIN_SYSTEM
  2. #define UVBOOK_PLUGIN_SYSTEM
  3. // Plugin authors should use this to register their plugins with mfp.
  4. void mfp_register(const char *name);
  5. #endif

你可以在你的程序中给插件添加更多有用的功能(mfp is My Fancy Plugin)。使用了这个api的插件的例子:

plugin/hello.c

  1. #include "plugin.h"
  2. void initialize() {
  3. mfp_register("Hello World!");
  4. }

我们的接口定义了,所有的插件都应该有一个能被程序调用的initialize函数。这个插件被编译成了共享库,因此可以被我们的程序在运行的时候载入。

  1. $ ./plugin libhello.dylib
  2. Loading libhello.dylib
  3. Registered plugin "Hello World!"

Note

共享库的后缀名在不同平台上是不一样的。在Linux上是libhello.so。

使用uv_dlopen首先载入了共享库libhello.dylib。再使用uv_dlsym获取了该插件的initialize函数,最后在调用它。

plugin/main.c

  1. #include "plugin.h"
  2. typedef void (*init_plugin_function)();
  3. void mfp_register(const char *name) {
  4. fprintf(stderr, "Registered plugin \"%s\"\n", name);
  5. }
  6. int main(int argc, char **argv) {
  7. if (argc == 1) {
  8. fprintf(stderr, "Usage: %s [plugin1] [plugin2] ...\n", argv[0]);
  9. return 0;
  10. }
  11. uv_lib_t *lib = (uv_lib_t*) malloc(sizeof(uv_lib_t));
  12. while (--argc) {
  13. fprintf(stderr, "Loading %s\n", argv[argc]);
  14. if (uv_dlopen(argv[argc], lib)) {
  15. fprintf(stderr, "Error: %s\n", uv_dlerror(lib));
  16. continue;
  17. }
  18. init_plugin_function init_plugin;
  19. if (uv_dlsym(lib, "initialize", (void **) &init_plugin)) {
  20. fprintf(stderr, "dlsym error: %s\n", uv_dlerror(lib));
  21. continue;
  22. }
  23. init_plugin();
  24. }
  25. return 0;
  26. }

函数uv_dlopen需要传入一个共享库的路径作为参数。当它成功时返回0,出错时返回-1。使用uv_dlerror可以获取出错的消息。

uv_dlsym的第三个参数保存了一个指向第二个参数所保存的函数的指针。init_plugin_function是一个函数的指针,它指向了我们所需要的程序插件的函数。

TTY

文字终端长期支持非常标准化的控制序列。它经常被用来增强终端输出的可读性。例如grep --colour。libuv提供了跨平台的,uv_tty_t抽象(stream)和相关的处理ANSI escape codes 的函数。这也就是说,libuv同样在Windows上实现了对等的ANSI codes,并且提供了获取终端信息的函数。

首先要做的是,使用读/写文件描述符来初始化uv_tty_t。如下:

  1. int uv_tty_init(uv_loop_t*, uv_tty_t*, uv_file fd, int readable)

设置readable为true,意味着你打算使用uv_read_start从stream从中读取数据。

最好还要使用uv_tty_set_mode来设置其为正常模式。也就是运行大多数的TTY格式,流控制和其他的设置。其他的模式还有这些

记得当你的程序退出后,要使用uv_tty_reset_mode恢复终端的状态。这才是礼貌的做法。另外要注意礼貌的地方是关心重定向。如果使用者将你的命令的输出重定向到文件,控制序列不应该被重写,因为这会阻碍可读性和grep。为了保证文件描述符确实是TTY,可以使用uv_guess_handle函数,比较返回值是否为UV_TTY

下面是一个把白字打印到红色背景上的例子。

tty/main.c

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <unistd.h>
  4. #include <uv.h>
  5. uv_loop_t *loop;
  6. uv_tty_t tty;
  7. int main() {
  8. loop = uv_default_loop();
  9. uv_tty_init(loop, &tty, 1, 0);
  10. uv_tty_set_mode(&tty, UV_TTY_MODE_NORMAL);
  11. if (uv_guess_handle(1) == UV_TTY) {
  12. uv_write_t req;
  13. uv_buf_t buf;
  14. buf.base = "\033[41;37m";
  15. buf.len = strlen(buf.base);
  16. uv_write(&req, (uv_stream_t*) &tty, &buf, 1, NULL);
  17. }
  18. uv_write_t req;
  19. uv_buf_t buf;
  20. buf.base = "Hello TTY\n";
  21. buf.len = strlen(buf.base);
  22. uv_write(&req, (uv_stream_t*) &tty, &buf, 1, NULL);
  23. uv_tty_reset_mode();
  24. return uv_run(loop, UV_RUN_DEFAULT);
  25. }

最后要说的是uv_tty_get_winsize(),它能获取到终端的宽和长,当成功获取后返回0。下面这个小程序实现了一个动画的效果。

tty-gravity/main.c

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <unistd.h>
  4. #include <uv.h>
  5. uv_loop_t *loop;
  6. uv_tty_t tty;
  7. uv_timer_t tick;
  8. uv_write_t write_req;
  9. int width, height;
  10. int pos = 0;
  11. char *message = " Hello TTY ";
  12. void update(uv_timer_t *req) {
  13. char data[500];
  14. uv_buf_t buf;
  15. buf.base = data;
  16. buf.len = sprintf(data, "\033[2J\033[H\033[%dB\033[%luC\033[42;37m%s",
  17. pos,
  18. (unsigned long) (width-strlen(message))/2,
  19. message);
  20. uv_write(&write_req, (uv_stream_t*) &tty, &buf, 1, NULL);
  21. pos++;
  22. if (pos > height) {
  23. uv_tty_reset_mode();
  24. uv_timer_stop(&tick);
  25. }
  26. }
  27. int main() {
  28. loop = uv_default_loop();
  29. uv_tty_init(loop, &tty, 1, 0);
  30. uv_tty_set_mode(&tty, 0);
  31. if (uv_tty_get_winsize(&tty, &width, &height)) {
  32. fprintf(stderr, "Could not get TTY information\n");
  33. uv_tty_reset_mode();
  34. return 1;
  35. }
  36. fprintf(stderr, "Width %d, height %d\n", width, height);
  37. uv_timer_init(loop, &tick);
  38. uv_timer_start(&tick, update, 200, 200);
  39. return uv_run(loop, UV_RUN_DEFAULT);
  40. }

escape codes的对应表如下:

代码 意义
2 J Clear part of the screen, 2 is entire screen
H Moves cursor to certain position, default top-left
n B Moves cursor down by n lines
n C Moves cursor right by n columns
m Obeys string of display settings, in this case green background (40+2), white text (30+7)

正如你所见,它能输出酷炫的效果,你甚至可以发挥想象,用它来制作电子游戏。更有趣的输出,可以使用http://www.gnu.org/software/ncurses/ncurses.html