Fuchsia 中的异步事件库 — async

事件是重要的概念,是指系统硬件或软件的状态出现任何重大改变,程序时刻都在触发和接收着各种事件:鼠标点击事件,键盘事件,socket 数据收发等等

我们常见的事件触发机制模型有两种:

一种是轮询机制模型,一种是事件驱动模型

在 Linux 下,事件驱动模型主要是由 poll、epoll 实现,Fuchsia 也提供了类似的事件驱动机制,其提供 async 库使用

Epoll 的使用

先来回忆一下 Linux 下 Epoll + socket 的典型 C/S 架构的具体使用

  1. // 创建Epoll实例
  2. int epfd = epoll_crete(100);
  3. // 添加需要监听事件和监听对象
  4. epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &listen_event);
  5. while(1)
  6. {
  7. // 阻塞中
  8. int active_cnt = epoll_wait(epfd, events, 1000 ,-1);
  9. // 对监听到的事件处理
  10. for(i=0; i< active_cnt; i++)
  11. {
  12. // 新到连接
  13. if(events[i].data.fd == listen_fd)
  14. {
  15. connfd = accept(listen_fd,(sockaddr *)&clientaddr, &clilen); ev.data.fd=connfd;
  16. ev.events=EPOLLIN|EPOLLET;
  17. epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);//将新的fd添加到epoll的监听队列中
  18. }
  19. else if( events[i].events & EPOLLIN ) // 接收到数据,读socket
  20. {
  21. n = read(sockfd, line, MAXLINE)) < 0;
  22. ev.data.ptr = md; // 添加数据
  23. ev.events=EPOLLOUT|EPOLLET;
  24. epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
  25. }
  26. else if(events[i].events & EPOLLOUT) // 有数据待发送,写socket
  27. {
  28. struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
  29. sockfd = md->fd;
  30. send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
  31. ev.data.fd=sockfd;
  32. ev.events=EPOLLIN|EPOLLET;
  33. epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
  34. }
  35. else
  36. {
  37. //其他的处理
  38. }
  39. }
  40. }

实例 — echo

fuchsia/garnet/examples/fidl/echo_server_cpp 中我们看到 echo_server.cc 源码和编译配置文件

  1. #include "echo_server_app.h"
  2. #include <lib/async-loop/cpp/loop.h>
  3. #include <lib/async-loop/default.h>
  4. #include <string>
  5. int main(int argc, const char** argv) {
  6. async::Loop loop(&kAsyncLoopConfigAttachToCurrentThread);
  7. bool quiet = (argc >= 2) && std::string("-q") == argv[1];
  8. echo::EchoServerApp app(quiet);
  9. loop.Run();
  10. return 0;
  11. }
  12. executable("bin") {
  13. output_name = "echo_server_cpp"
  14. sources = [ "echo_server.cc" ]
  15. deps = [
  16. ":lib",
  17. "//zircon/system/ulib/async-default",
  18. "//zircon/system/ulib/async-loop:async-loop-cpp",
  19. "//zircon/system/ulib/async-loop:async-loop-default",
  20. ]
  21. }

编译配置文件中引入 async 异步事件库作为 component 依赖,在主程序头文件引入相关声明

echo_server_cpp.cc 中先创建 async 对象,配置好相关信息后调用 loop.Run(); 进入死循环一直等待事件的发生(除非使用 loop.quit 主动退出监听循环)

相对于 epoll,async 库的 loop 使用屏蔽了许多细节,接下来我们深入源码层级,来看看 async 库中 loop 做了什么。

Dive into ansyc source code

async-loop-cppasync-loop-defaut 库均来自 fuchsia/zircon/system/ulib/async-loop 的编译

  1. // 核心数据结构 -- async_loop_t对象定义
  2. typedef struct async_loop {
  3. async_dispatcher_t dispatcher; // must be first (the loop inherits from async_dispatcher_t)
  4. async_loop_config_t config; // immutable
  5. zx_handle_t port; // immutable
  6. zx_handle_t timer; // immutable
  7. _Atomic async_loop_state_t state;
  8. atomic_uint active_threads; // number of active dispatch threads
  9. mtx_t lock; // guards the lists and the dispatching tasks flag
  10. bool dispatching_tasks; // true while the loop is busy dispatching tasks
  11. list_node_t wait_list; // most recently added first
  12. list_node_t task_list; // pending tasks, earliest deadline first
  13. list_node_t due_list; // due tasks, earliest deadline first
  14. list_node_t thread_list; // earliest created thread first
  15. list_node_t irq_list; // list of IRQs
  16. list_node_t paged_vmo_list; // most recently added first
  17. bool timer_armed; // true if timer has been set and has not fired yet
  18. } async_loop_t;
  19. // loop对象
  20. class Loop {
  21. public:
  22. explicit Loop(const async_loop_config_t* config);
  23. Loop(const Loop&) = delete;
  24. Loop(Loop&&) = delete;
  25. Loop& operator=(const Loop&) = delete;
  26. Loop& operator=(Loop&&) = delete;
  27. ~Loop();
  28. async_loop_t* loop() const { return loop_; }
  29. async_dispatcher_t* dispatcher() const { return async_loop_get_dispatcher(loop_); }
  30. void Shutdown();
  31. zx_status_t Run(zx::time deadline = zx::time::infinite(), bool once = false);
  32. zx_status_t RunUntilIdle();
  33. void Quit();
  34. zx_status_t ResetQuit();
  35. async_loop_state_t GetState() const;
  36. zx_status_t StartThread(const char* name = nullptr, thrd_t* out_thread = nullptr);
  37. void JoinThreads();
  38. private:
  39. async_loop_t* loop_;
  40. };
  41. // 在loop的构造函数里 使用async_loop_create创建zx_status_t实例
  42. Loop::Loop(const async_loop_config_t* config) {
  43. zx_status_t status = async_loop_create(config, &loop_);
  44. ZX_ASSERT_MSG(status == ZX_OK, "status=%d", status);
  45. }
  46. // Run方法调用async_loop_run
  47. zx_status_t Loop::Run(zx::time deadline, bool once) {
  48. return async_loop_run(loop_, deadline.get(), once);
  49. }
  50. zx_status_t async_loop_run(async_loop_t* loop, zx_time_t deadline, bool once) {
  51. ZX_DEBUG_ASSERT(loop);
  52. zx_status_t status;
  53. atomic_fetch_add_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
  54. do {
  55. status = async_loop_run_once(loop, deadline);
  56. } while (status == ZX_OK && !once);
  57. atomic_fetch_sub_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
  58. return status;
  59. }

创建 async_loop_t 对象

接下来,我们深入理解创建 async_loop_t 对象

  1. zx_status_t async_loop_create(const async_loop_config_t* config, async_loop_t** out_loop) {
  2. ZX_DEBUG_ASSERT(out_loop);
  3. ZX_DEBUG_ASSERT(config != NULL);
  4. // If a setter was given, a getter should have been, too.
  5. ZX_ASSERT((config->default_accessors.setter != NULL) ==
  6. (config->default_accessors.getter != NULL));
  7. async_loop_t* loop = calloc(1u, sizeof(async_loop_t));
  8. if (!loop)
  9. return ZX_ERR_NO_MEMORY;
  10. atomic_init(&loop->state, ASYNC_LOOP_RUNNABLE);
  11. atomic_init(&loop->active_threads, 0u);
  12. loop->dispatcher.ops = (const async_ops_t*)&async_loop_ops;
  13. loop->config = *config;
  14. mtx_init(&loop->lock, mtx_plain);
  15. list_initialize(&loop->wait_list);
  16. list_initialize(&loop->irq_list);
  17. list_initialize(&loop->task_list);
  18. list_initialize(&loop->due_list);
  19. list_initialize(&loop->thread_list);
  20. list_initialize(&loop->paged_vmo_list);
  21. zx_status_t status =
  22. zx_port_create(config->irq_support ? ZX_PORT_BIND_TO_INTERRUPT : 0, &loop->port);
  23. if (status == ZX_OK)
  24. status = zx_timer_create(ZX_TIMER_SLACK_LATE, ZX_CLOCK_MONOTONIC, &loop->timer);
  25. if (status == ZX_OK) {
  26. *out_loop = loop;
  27. if (loop->config.make_default_for_current_thread) {
  28. ZX_DEBUG_ASSERT(loop->config.default_accessors.getter() == NULL);
  29. loop->config.default_accessors.setter(&loop->dispatcher);
  30. }
  31. } else {
  32. // Adjust this flag so we don't trip an assert trying to clear a default dispatcher we never
  33. // installed.
  34. loop->config.make_default_for_current_thread = false;
  35. async_loop_destroy(loop);
  36. }
  37. return status;
  38. }

可以看到,我们先是为 async_loop_t 分配空间,接着初始化字段与监听对象、执行的需要异步任务、需要执行的超时的异步任务、中断请求等相关链表。接着创建监听端口,定时器等配置。最后一步返回创建结果。

事件监听循环与分发

Loop::Run 方法调用 async_loop_run 函数,这个函数关键部分在这里

  1. do {
  2. status = async_loop_run_once(loop, deadline);
  3. } while (status == ZX_OK && !once);

这个循环读取底层上报事件循环体,async_loop_run_once 每次阻塞地读取一个事件,接着解析事件包,分发给对应函数进行处理

  1. static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline) {
  2. async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
  3. if (state == ASYNC_LOOP_SHUTDOWN)
  4. return ZX_ERR_BAD_STATE;
  5. if (state != ASYNC_LOOP_RUNNABLE)
  6. return ZX_ERR_CANCELED;
  7. zx_port_packet_t packet;
  8. zx_status_t status = zx_port_wait(loop->port, deadline, &packet);
  9. if (status != ZX_OK)
  10. return status;
  11. if (packet.key == KEY_CONTROL) {
  12. // Handle wake-up packets.
  13. if (packet.type == ZX_PKT_TYPE_USER)
  14. return ZX_OK;
  15. // Handle task timer expirations.
  16. if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE && packet.signal.observed & ZX_TIMER_SIGNALED) {
  17. return async_loop_dispatch_tasks(loop);
  18. }
  19. } else {
  20. // Handle wait completion packets.
  21. if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
  22. async_wait_t* wait = (void*)(uintptr_t)packet.key;
  23. mtx_lock(&loop->lock);
  24. list_delete(wait_to_node(wait));
  25. mtx_unlock(&loop->lock);
  26. return async_loop_dispatch_wait(loop, wait, packet.status, &packet.signal);
  27. }
  28. // Handle queued user packets.
  29. if (packet.type == ZX_PKT_TYPE_USER) {
  30. async_receiver_t* receiver = (void*)(uintptr_t)packet.key;
  31. return async_loop_dispatch_packet(loop, receiver, packet.status, &packet.user);
  32. }
  33. // Handle guest bell trap packets.
  34. if (packet.type == ZX_PKT_TYPE_GUEST_BELL) {
  35. async_guest_bell_trap_t* trap = (void*)(uintptr_t)packet.key;
  36. return async_loop_dispatch_guest_bell_trap(loop, trap, packet.status, &packet.guest_bell);
  37. }
  38. // Handle interrupt packets.
  39. if (packet.type == ZX_PKT_TYPE_INTERRUPT) {
  40. async_irq_t* irq = (void*)(uintptr_t)packet.key;
  41. return async_loop_dispatch_irq(loop, irq, packet.status, &packet.interrupt);
  42. }
  43. // Handle pager packets.
  44. if (packet.type == ZX_PKT_TYPE_PAGE_REQUEST) {
  45. async_paged_vmo_t* paged_vmo = (void*)(uintptr_t)packet.key;
  46. return async_loop_dispatch_paged_vmo(loop, paged_vmo, packet.status, &packet.page_request);
  47. }
  48. }
  49. ZX_DEBUG_ASSERT(false);
  50. return ZX_ERR_INTERNAL;
  51. }

从 packet.type 我们可以判断出 async 库支持的事件类型有定时器任务、等待 wait、data packet 事件、bell trap 事件、中断请求事件、内存共享事件

对事件的添加与取消这里以定时器为例,详细函数实现在

static zx_status_t async_loop_post_task(async_dispatcher_t* async, async_task_t* task) — 添加定时器任务

static zx_status_t async_loop_cancel_task(async_dispatcher_t* async, async_task_t* task) — 取消定时器任务

这里不过多赘述

handler

以定时器事件为例,handler 为 async_loop_dispatch_tasks,这个 handler 是由调用者在添加事件监听时,构造 async_task_t 传入

  1. static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task, zx_status_t status) {
  2. // Invoke the handler. Note that it might destroy itself.
  3. async_loop_invoke_prologue(loop);
  4. task->handler((async_dispatcher_t*)loop, task, status);
  5. async_loop_invoke_epilogue(loop);
  6. }