Fuchsia 中的异步事件库 — async
事件是重要的概念,是指系统硬件或软件的状态出现任何重大改变,程序时刻都在触发和接收着各种事件:鼠标点击事件,键盘事件,socket 数据收发等等
我们常见的事件触发机制模型有两种:
一种是轮询机制模型,一种是事件驱动模型
在 Linux 下,事件驱动模型主要是由 poll、epoll 实现,Fuchsia 也提供了类似的事件驱动机制,其提供 async 库使用
Epoll 的使用
先来回忆一下 Linux 下 Epoll + socket 的典型 C/S 架构的具体使用
// 创建Epoll实例int epfd = epoll_crete(100);// 添加需要监听事件和监听对象epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &listen_event);while(1){// 阻塞中int active_cnt = epoll_wait(epfd, events, 1000 ,-1);// 对监听到的事件处理for(i=0; i< active_cnt; i++){// 新到连接if(events[i].data.fd == listen_fd){connfd = accept(listen_fd,(sockaddr *)&clientaddr, &clilen); ev.data.fd=connfd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);//将新的fd添加到epoll的监听队列中}else if( events[i].events & EPOLLIN ) // 接收到数据,读socket{n = read(sockfd, line, MAXLINE)) < 0;ev.data.ptr = md; // 添加数据ev.events=EPOLLOUT|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓}else if(events[i].events & EPOLLOUT) // 有数据待发送,写socket{struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据sockfd = md->fd;send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据ev.data.fd=sockfd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);}else{//其他的处理}}}
实例 — echo
在 fuchsia/garnet/examples/fidl/echo_server_cpp 中我们看到 echo_server.cc 源码和编译配置文件
#include "echo_server_app.h"#include <lib/async-loop/cpp/loop.h>#include <lib/async-loop/default.h>#include <string>int main(int argc, const char** argv) {async::Loop loop(&kAsyncLoopConfigAttachToCurrentThread);bool quiet = (argc >= 2) && std::string("-q") == argv[1];echo::EchoServerApp app(quiet);loop.Run();return 0;}executable("bin") {output_name = "echo_server_cpp"sources = [ "echo_server.cc" ]deps = [":lib","//zircon/system/ulib/async-default","//zircon/system/ulib/async-loop:async-loop-cpp","//zircon/system/ulib/async-loop:async-loop-default",]}
编译配置文件中引入 async 异步事件库作为 component 依赖,在主程序头文件引入相关声明
在 echo_server_cpp.cc 中先创建 async 对象,配置好相关信息后调用 loop.Run(); 进入死循环一直等待事件的发生(除非使用 loop.quit 主动退出监听循环)
相对于 epoll,async 库的 loop 使用屏蔽了许多细节,接下来我们深入源码层级,来看看 async 库中 loop 做了什么。
Dive into ansyc source code
async-loop-cpp 和 async-loop-defaut 库均来自 fuchsia/zircon/system/ulib/async-loop 的编译
// 核心数据结构 -- async_loop_t对象定义typedef struct async_loop {async_dispatcher_t dispatcher; // must be first (the loop inherits from async_dispatcher_t)async_loop_config_t config; // immutablezx_handle_t port; // immutablezx_handle_t timer; // immutable_Atomic async_loop_state_t state;atomic_uint active_threads; // number of active dispatch threadsmtx_t lock; // guards the lists and the dispatching tasks flagbool dispatching_tasks; // true while the loop is busy dispatching taskslist_node_t wait_list; // most recently added firstlist_node_t task_list; // pending tasks, earliest deadline firstlist_node_t due_list; // due tasks, earliest deadline firstlist_node_t thread_list; // earliest created thread firstlist_node_t irq_list; // list of IRQslist_node_t paged_vmo_list; // most recently added firstbool timer_armed; // true if timer has been set and has not fired yet} async_loop_t;// loop对象class Loop {public:explicit Loop(const async_loop_config_t* config);Loop(const Loop&) = delete;Loop(Loop&&) = delete;Loop& operator=(const Loop&) = delete;Loop& operator=(Loop&&) = delete;~Loop();async_loop_t* loop() const { return loop_; }async_dispatcher_t* dispatcher() const { return async_loop_get_dispatcher(loop_); }void Shutdown();zx_status_t Run(zx::time deadline = zx::time::infinite(), bool once = false);zx_status_t RunUntilIdle();void Quit();zx_status_t ResetQuit();async_loop_state_t GetState() const;zx_status_t StartThread(const char* name = nullptr, thrd_t* out_thread = nullptr);void JoinThreads();private:async_loop_t* loop_;};// 在loop的构造函数里 使用async_loop_create创建zx_status_t实例Loop::Loop(const async_loop_config_t* config) {zx_status_t status = async_loop_create(config, &loop_);ZX_ASSERT_MSG(status == ZX_OK, "status=%d", status);}// Run方法调用async_loop_runzx_status_t Loop::Run(zx::time deadline, bool once) {return async_loop_run(loop_, deadline.get(), once);}zx_status_t async_loop_run(async_loop_t* loop, zx_time_t deadline, bool once) {ZX_DEBUG_ASSERT(loop);zx_status_t status;atomic_fetch_add_explicit(&loop->active_threads, 1u, memory_order_acq_rel);do {status = async_loop_run_once(loop, deadline);} while (status == ZX_OK && !once);atomic_fetch_sub_explicit(&loop->active_threads, 1u, memory_order_acq_rel);return status;}
创建 async_loop_t 对象
接下来,我们深入理解创建 async_loop_t 对象
zx_status_t async_loop_create(const async_loop_config_t* config, async_loop_t** out_loop) {ZX_DEBUG_ASSERT(out_loop);ZX_DEBUG_ASSERT(config != NULL);// If a setter was given, a getter should have been, too.ZX_ASSERT((config->default_accessors.setter != NULL) ==(config->default_accessors.getter != NULL));async_loop_t* loop = calloc(1u, sizeof(async_loop_t));if (!loop)return ZX_ERR_NO_MEMORY;atomic_init(&loop->state, ASYNC_LOOP_RUNNABLE);atomic_init(&loop->active_threads, 0u);loop->dispatcher.ops = (const async_ops_t*)&async_loop_ops;loop->config = *config;mtx_init(&loop->lock, mtx_plain);list_initialize(&loop->wait_list);list_initialize(&loop->irq_list);list_initialize(&loop->task_list);list_initialize(&loop->due_list);list_initialize(&loop->thread_list);list_initialize(&loop->paged_vmo_list);zx_status_t status =zx_port_create(config->irq_support ? ZX_PORT_BIND_TO_INTERRUPT : 0, &loop->port);if (status == ZX_OK)status = zx_timer_create(ZX_TIMER_SLACK_LATE, ZX_CLOCK_MONOTONIC, &loop->timer);if (status == ZX_OK) {*out_loop = loop;if (loop->config.make_default_for_current_thread) {ZX_DEBUG_ASSERT(loop->config.default_accessors.getter() == NULL);loop->config.default_accessors.setter(&loop->dispatcher);}} else {// Adjust this flag so we don't trip an assert trying to clear a default dispatcher we never// installed.loop->config.make_default_for_current_thread = false;async_loop_destroy(loop);}return status;}
可以看到,我们先是为 async_loop_t 分配空间,接着初始化字段与监听对象、执行的需要异步任务、需要执行的超时的异步任务、中断请求等相关链表。接着创建监听端口,定时器等配置。最后一步返回创建结果。
事件监听循环与分发
Loop::Run 方法调用 async_loop_run 函数,这个函数关键部分在这里
do {status = async_loop_run_once(loop, deadline);} while (status == ZX_OK && !once);
这个循环读取底层上报事件循环体,async_loop_run_once 每次阻塞地读取一个事件,接着解析事件包,分发给对应函数进行处理
static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline) {async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);if (state == ASYNC_LOOP_SHUTDOWN)return ZX_ERR_BAD_STATE;if (state != ASYNC_LOOP_RUNNABLE)return ZX_ERR_CANCELED;zx_port_packet_t packet;zx_status_t status = zx_port_wait(loop->port, deadline, &packet);if (status != ZX_OK)return status;if (packet.key == KEY_CONTROL) {// Handle wake-up packets.if (packet.type == ZX_PKT_TYPE_USER)return ZX_OK;// Handle task timer expirations.if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE && packet.signal.observed & ZX_TIMER_SIGNALED) {return async_loop_dispatch_tasks(loop);}} else {// Handle wait completion packets.if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {async_wait_t* wait = (void*)(uintptr_t)packet.key;mtx_lock(&loop->lock);list_delete(wait_to_node(wait));mtx_unlock(&loop->lock);return async_loop_dispatch_wait(loop, wait, packet.status, &packet.signal);}// Handle queued user packets.if (packet.type == ZX_PKT_TYPE_USER) {async_receiver_t* receiver = (void*)(uintptr_t)packet.key;return async_loop_dispatch_packet(loop, receiver, packet.status, &packet.user);}// Handle guest bell trap packets.if (packet.type == ZX_PKT_TYPE_GUEST_BELL) {async_guest_bell_trap_t* trap = (void*)(uintptr_t)packet.key;return async_loop_dispatch_guest_bell_trap(loop, trap, packet.status, &packet.guest_bell);}// Handle interrupt packets.if (packet.type == ZX_PKT_TYPE_INTERRUPT) {async_irq_t* irq = (void*)(uintptr_t)packet.key;return async_loop_dispatch_irq(loop, irq, packet.status, &packet.interrupt);}// Handle pager packets.if (packet.type == ZX_PKT_TYPE_PAGE_REQUEST) {async_paged_vmo_t* paged_vmo = (void*)(uintptr_t)packet.key;return async_loop_dispatch_paged_vmo(loop, paged_vmo, packet.status, &packet.page_request);}}ZX_DEBUG_ASSERT(false);return ZX_ERR_INTERNAL;}
从 packet.type 我们可以判断出 async 库支持的事件类型有定时器任务、等待 wait、data packet 事件、bell trap 事件、中断请求事件、内存共享事件
对事件的添加与取消这里以定时器为例,详细函数实现在
static zx_status_t async_loop_post_task(async_dispatcher_t* async, async_task_t* task) — 添加定时器任务
static zx_status_t async_loop_cancel_task(async_dispatcher_t* async, async_task_t* task) — 取消定时器任务
这里不过多赘述
handler
以定时器事件为例,handler 为 async_loop_dispatch_tasks,这个 handler 是由调用者在添加事件监听时,构造 async_task_t 传入
static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task, zx_status_t status) {// Invoke the handler. Note that it might destroy itself.async_loop_invoke_prologue(loop);task->handler((async_dispatcher_t*)loop, task, status);async_loop_invoke_epilogue(loop);}
