前言:虽然Async hooks至此还是实验性API,但是他的确可以解决应用中的一些问题,比如日志和调用栈跟踪。本文从应用和原理方便介绍一下Node.js的Async hooks。

1 env中的AsyncHooks

在Node.js的env对象中有一个AsyncHooks对象,负责Node.js进程中async_hooks的管理。我们看一下定义。

1.1 类定义

  1. class AsyncHooks : public MemoryRetainer {
  2. public:
  3. enum Fields {
  4. // 五种钩子
  5. kInit,
  6. kBefore,
  7. kAfter,
  8. kDestroy,
  9. kPromiseResolve,
  10. // 钩子总数
  11. kTotals,
  12. // async_hooks开启的个数
  13. kCheck,
  14. // 记录栈的top指针
  15. kStackLength,
  16. // 数组大小
  17. kFieldsCount,
  18. };
  19. enum UidFields {
  20. kExecutionAsyncId,
  21. kTriggerAsyncId,
  22. // 当前async id的值
  23. kAsyncIdCounter,
  24. kDefaultTriggerAsyncId,
  25. kUidFieldsCount,
  26. };
  27. private:
  28. inline AsyncHooks();
  29. // 异步资源的类型
  30. std::array<v8::Eternal<v8::String>, AsyncWrap::PROVIDERS_LENGTH> providers_;
  31. // 栈
  32. AliasedFloat64Array async_ids_stack_;
  33. // 整形数组,每个元素值的意义和Fields对应
  34. AliasedUint32Array fields_;
  35. // 整形数组,每个元素值的意义和UidFields对应
  36. AliasedFloat64Array async_id_fields_;
  37. };

结构图如下
23-Async hooks - 图1
接下来看一下env的AsyncHooks对象提供了哪些API,这些API是上层的基础。

1.2 读API

我们看一下env对象中获取AsyncHooks对象对应字段的API。

  1. // 获取对应的字段
  2. inline AliasedUint32Array& AsyncHooks::fields() {
  3. return fields_;
  4. }
  5. inline AliasedFloat64Array& AsyncHooks::async_id_fields() {
  6. return async_id_fields_;
  7. }
  8. inline AliasedFloat64Array& AsyncHooks::async_ids_stack() {
  9. return async_ids_stack_;
  10. }
  11. // 获取资源类型
  12. inline v8::Local<v8::String> AsyncHooks::provider_string(int idx) {
  13. return providers_[idx].Get(env()->isolate());
  14. }
  15. // 新建资源的时候,获取新的async id
  16. inline double Environment::new_async_id() {
  17. async_hooks()->async_id_fields()[AsyncHooks::kAsyncIdCounter] += 1;
  18. return async_hooks()->async_id_fields()[AsyncHooks::kAsyncIdCounter];
  19. }
  20. // 获取当前async id
  21. inline double Environment::execution_async_id() {
  22. return async_hooks()->async_id_fields()[AsyncHooks::kExecutionAsyncId];
  23. }
  24. // 获取当前trigger async id
  25. inline double Environment::trigger_async_id() {
  26. return async_hooks()->async_id_fields()[AsyncHooks::kTriggerAsyncId];
  27. }
  28. // 获取默认的trigger async id,如果没有设置,则获取当前的async id
  29. inline double Environment::get_default_trigger_async_id() {
  30. double default_trigger_async_id = async_hooks()->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId];
  31. // If defaultTriggerAsyncId isn't set, use the executionAsyncId
  32. if (default_trigger_async_id < 0)
  33. default_trigger_async_id = execution_async_id();
  34. return default_trigger_async_id;
  35. }

1.3 写API

  1. inline void AsyncHooks::push_async_ids(double async_id,
  2. double trigger_async_id) {
  3. // 获取当前栈顶指针
  4. uint32_t offset = fields_[kStackLength];
  5. // 不够则扩容
  6. if (offset * 2 >= async_ids_stack_.Length())
  7. grow_async_ids_stack();
  8. // 把旧的上下文压栈
  9. async_ids_stack_[2 * offset] = async_id_fields_[kExecutionAsyncId];
  10. async_ids_stack_[2 * offset + 1] = async_id_fields_[kTriggerAsyncId];
  11. // 栈指针加一
  12. fields_[kStackLength] += 1;
  13. // 记录当前上下文
  14. async_id_fields_[kExecutionAsyncId] = async_id;
  15. async_id_fields_[kTriggerAsyncId] = trigger_async_id;
  16. }
  17. // 和上面的逻辑相反
  18. inline bool AsyncHooks::pop_async_id(double async_id) {
  19. if (fields_[kStackLength] == 0) return false;
  20. uint32_t offset = fields_[kStackLength] - 1;
  21. async_id_fields_[kExecutionAsyncId] = async_ids_stack_[2 * offset];
  22. async_id_fields_[kTriggerAsyncId] = async_ids_stack_[2 * offset + 1];
  23. fields_[kStackLength] = offset;
  24. return fields_[kStackLength] > 0;
  25. }

2 底层资源封装类 - AsyncWrap

接着看一下异步资源的基类AsyncWrap。所有依赖于C、C++层实现的资源(比如TCP、UDP)都会继承AsyncWrap。看看该类的定义。

  1. class AsyncWrap : public BaseObject {
  2. private:
  3. ProviderType provider_type_ = PROVIDER_NONE;
  4. double async_id_ = kInvalidAsyncId;
  5. double trigger_async_id_;
  6. };

我们看到每个AsyncWrap对象都有asyncid、triggerasync_id和providertype属性,这正是在init回调里拿到的数据。我们看看AsyncWrap的构造函数。接下来看一下新建一个资源(AsyncWrap)时的逻辑。

2.1 资源初始化

  1. AsyncWrap::AsyncWrap(Environment* env,
  2. Local<Object> object,
  3. ProviderType provider,
  4. double execution_async_id,
  5. bool silent)
  6. : AsyncWrap(env, object) {
  7. // 资源类型
  8. provider_type_ = provider;
  9. AsyncReset(execution_async_id, silent);
  10. }
  11. void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
  12. bool silent) {
  13. // 获取一个新的async id,execution_async_id默认是kInvalidAsyncId
  14. async_id_ = execution_async_id == kInvalidAsyncId ? env()->new_async_id()
  15. : execution_async_id;
  16. // 获取trigger async id
  17. trigger_async_id_ = env()->get_default_trigger_async_id();
  18. // 执行init钩子
  19. EmitAsyncInit(env(), resource,
  20. env()->async_hooks()->provider_string(provider_type()),
  21. async_id_, trigger_async_id_);
  22. }

接着看EmitAsyncInit

  1. void AsyncWrap::EmitAsyncInit(Environment* env,
  2. Local<Object> object,
  3. Local<String> type,
  4. double async_id,
  5. double trigger_async_id) {
  6. AsyncHooks* async_hooks = env->async_hooks();
  7. HandleScope scope(env->isolate());
  8. Local<Function> init_fn = env->async_hooks_init_function();
  9. Local<Value> argv[] = {
  10. Number::New(env->isolate(), async_id),
  11. type,
  12. Number::New(env->isolate(), trigger_async_id),
  13. object,
  14. };
  15. TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal);
  16. // 执行init回调
  17. USE(init_fn->Call(env->context(), object, arraysize(argv), argv));
  18. }

那么env->async_hooks_init_function()的值是什么呢?这是在Node.js初始化时设置的。

  1. const { nativeHooks } = require('internal/async_hooks');
  2. internalBinding('async_wrap').setupHooks(nativeHooks);

SetupHooks的实现如下

  1. static void SetupHooks(const FunctionCallbackInfo<Value>& args) {
  2. Environment* env = Environment::GetCurrent(args);
  3. Local<Object> fn_obj = args[0].As<Object>();
  4. #define SET_HOOK_FN(name) \
  5. do { \
  6. Local<Value> v = \
  7. fn_obj->Get(env->context(), \
  8. FIXED_ONE_BYTE_STRING(env->isolate(), #name)) \
  9. .ToLocalChecked(); \
  10. CHECK(v->IsFunction()); \
  11. env->set_async_hooks_##name##_function(v.As<Function>()); \
  12. } while (0)
  13. // 保存到env中
  14. SET_HOOK_FN(init);
  15. SET_HOOK_FN(before);
  16. SET_HOOK_FN(after);
  17. SET_HOOK_FN(destroy);
  18. SET_HOOK_FN(promise_resolve);
  19. #undef SET_HOOK_FN
  20. }

nativeHooks的实现如下

  1. nativeHooks: {
  2. init: emitInitNative,
  3. before: emitBeforeNative,
  4. after: emitAfterNative,
  5. destroy: emitDestroyNative,
  6. promise_resolve: emitPromiseResolveNative
  7. }

这些Hooks会执行对应的回调,比如emitInitNative

  1. function emitInitNative(asyncId, type, triggerAsyncId, resource) {
  2. for (var i = 0; i < active_hooks.array.length; i++) {
  3. if (typeof active_hooks.array[i][init_symbol] === 'function') {
  4. active_hooks.array[i][init_symbol](
  5. asyncId, type, triggerAsyncId,
  6. resource
  7. );
  8. }
  9. }
  10. }

active_hooks.array的值就是我们在业务代码里设置的钩子,每次调研createHooks的时候就对应数组的一个元素。

2.2 执行资源回调

当业务代码异步请求底层API,并且底层满足条件时,就会执行上层的回调,比如监听一个socket时,有连接到来。Node.js就会调用MakeCallback函数执行回调。

  1. MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
  2. int argc,
  3. Local<Value>* argv) {
  4. // 当前AsyncWrap对象对应的执行上下文
  5. ProviderType provider = provider_type();
  6. async_context context { get_async_id(), get_trigger_async_id() };
  7. MaybeLocal<Value> ret = InternalMakeCallback(env(), object(), cb, argc, argv, context);
  8. return ret;
  9. }

MakeCallback中会调用InternalMakeCallback。

  1. MaybeLocal<Value> InternalMakeCallback(Environment* env,
  2. Local<Object> recv,
  3. const Local<Function> callback,
  4. int argc,
  5. Local<Value> argv[],
  6. async_context asyncContext) {
  7. // 新建一个scope
  8. InternalCallbackScope scope(env, recv, asyncContext);
  9. // 执行回调
  10. callback->Call(env->context(), recv, argc, argv);
  11. // 关闭scope
  12. scope.Close();
  13. }

我们看看新建和关闭scope都做了什么事情。

  1. InternalCallbackScope::InternalCallbackScope(Environment* env,
  2. Local<Object> object,
  3. const async_context& asyncContext,
  4. int flags)
  5. : env_(env),
  6. async_context_(asyncContext),
  7. object_(object),
  8. skip_hooks_(flags & kSkipAsyncHooks),
  9. skip_task_queues_(flags & kSkipTaskQueues) {
  10. // v14版本中,是先触发before再push上下文,顺序是不对的,v16已经改过来。
  11. // 当前执行上下文入栈
  12. env->async_hooks()->push_async_ids(async_context_.async_id,
  13. async_context_.trigger_async_id);
  14. // 触发before钩子
  15. if (asyncContext.async_id != 0 && !skip_hooks_) {
  16. AsyncWrap::EmitBefore(env, asyncContext.async_id);
  17. }
  18. pushed_ids_ = true;
  19. }

在scope里会把当前AsyncWrap对象的执行上下文作为当前执行上下文,并且触发before钩子,然后执行业务回调,所以我们在回调里获取当前执行上下文时就拿到了AsyncWrap对应的值( 调用executionAsyncId),接着看Close

  1. void InternalCallbackScope::Close() {
  2. // 执行
  3. if (pushed_ids_)
  4. env_->async_hooks()->pop_async_id(async_context_.async_id);
  5. if (async_context_.async_id != 0 && !skip_hooks_) {
  6. AsyncWrap::EmitAfter(env_, async_context_.async_id);
  7. }
  8. }

Close在执行回调后被调用,主要是恢复当前执行上下文并且触发after钩子。

3 上层资源的封装 - Timeout、TickObjecd等

并不是所有的异步资源都是底层实现的,比如定时器,tick也被定义为异步资源,因为他们都是和回调相关。这种异步资源是在JS层实现的,这里只分析Timeout。

3.1 创建资源

我们看一下执行setTimeout时的核心逻辑。

  1. function setTimeout(callback, after, arg1, arg2, arg3) {
  2. const timeout = new Timeout(callback, after, args, false, true);
  3. return timeout;
  4. }
  5. function Timeout(callback, after, args, isRepeat, isRefed) {
  6. initAsyncResource(this, 'Timeout');
  7. }
  8. function initAsyncResource(resource, type) {
  9. // 获取新的async id
  10. const asyncId = resource[async_id_symbol] = newAsyncId();
  11. const triggerAsyncId = resource[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
  12. // 是否设置了init钩子,是则触发回调
  13. if (initHooksExist())
  14. emitInit(asyncId, type, triggerAsyncId, resource);
  15. }

执行setTimeout时,Node.js会创建一个Timeout对象,设置async_hooks相关的上下文并记录到Timeout对象中。然后触发init钩子。

  1. function emitInitScript(asyncId, type, triggerAsyncId, resource) {
  2. emitInitNative(asyncId, type, triggerAsyncId, resource);
  3. }

以上代码会执行每个async_hooks对象的init回调(通常我们只有一个async_hooks对象)。

3.1 执行回调

当定时器到期时,会执行回调,我们看看相关的逻辑。

  1. // 触发before钩子
  2. emitBefore(asyncId, timer[trigger_async_id_symbol]);
  3. // 执行回调
  4. timer._onTimeout();
  5. // 触发after回调
  6. emitAfter(asyncId);

我们看到执行超时回调的前后会触发对应的钩子。

  1. function emitBeforeScript(asyncId, triggerAsyncId) {
  2. // 和底层的push_async_ids逻辑一样
  3. pushAsyncIds(asyncId, triggerAsyncId);
  4. // 如果有回调则执行
  5. if (async_hook_fields[kBefore] > 0)
  6. emitBeforeNative(asyncId);
  7. }
  8. function emitAfterScript(asyncId) {
  9. // 设置了after回调则emit
  10. if (async_hook_fields[kAfter] > 0)
  11. emitAfterNative(asyncId);
  12. // 和底层的pop_async_ids逻辑一样
  13. popAsyncIds(asyncId);
  14. }

JS层的实现和底层是保持一致的。如果我们在setTimeout回调里新建一个资源,比如再次执行setTimeout,这时候trigger async id就是第一个setTimeout对应的async id,所以就连起来了,后面我们会看到具体的例子。

4 DefaultTriggerAsyncIdScope

Node.js为了避免过多通过参数传递的方式传递async id,就设计了DefaultTriggerAsyncIdScope。DefaultTriggerAsyncIdScope的作用类似在多个函数外维护一个变量,多个函数都可以通过DefaultTriggerAsyncIdScope获得trigger async id,而不需要通过层层传递的方式,他的实现非常简单。

  1. class DefaultTriggerAsyncIdScope {
  2. private:
  3. AsyncHooks* async_hooks_;
  4. double old_default_trigger_async_id_;
  5. };
  6. inline AsyncHooks::DefaultTriggerAsyncIdScope ::DefaultTriggerAsyncIdScope(
  7. Environment* env, double default_trigger_async_id)
  8. : async_hooks_(env->async_hooks()) {
  9. // 记录旧的id,设置新的id
  10. old_default_trigger_async_id_ =
  11. async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId];
  12. async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId] =
  13. default_trigger_async_id;
  14. }
  15. // 恢复
  16. inline AsyncHooks::DefaultTriggerAsyncIdScope ::~DefaultTriggerAsyncIdScope() {
  17. async_hooks_->async_id_fields()[AsyncHooks::kDefaultTriggerAsyncId] =
  18. old_default_trigger_async_id_;
  19. }

DefaultTriggerAsyncIdScope主要是记录旧的id,然后把新的id设置到env中,当其他函数调用get_default_trigger_async_id时就可以获取设置的async id。同样JS层也实现了类似的API。

  1. function defaultTriggerAsyncIdScope(triggerAsyncId, block, ...args) {
  2. const oldDefaultTriggerAsyncId = async_id_fields[kDefaultTriggerAsyncId];
  3. async_id_fields[kDefaultTriggerAsyncId] = triggerAsyncId;
  4. try {
  5. return block(...args);
  6. } finally {
  7. async_id_fields[kDefaultTriggerAsyncId] = oldDefaultTriggerAsyncId;
  8. }
  9. }

在执行block函数时,可以获取到设置的值,而不需要传递,执行完block后恢复。我们看看如何使用。下面摘自net模块的代码。

  1. // 获取handle里的async id
  2. this[async_id_symbol] = getNewAsyncId(this._handle);
  3. defaultTriggerAsyncIdScope(this[async_id_symbol],
  4. process.nextTick,
  5. emitListeningNT,
  6. this);

我们看一下这里具体的情况。在defaultTriggerAsyncIdScope中会以emitListeningNT为入参执行process.nextTick。我们看看nextTick的实现。

  1. function nextTick(callback) {
  2. // 获取新的async id
  3. const asyncId = newAsyncId();
  4. // 获取默认的trigger async id,即刚才设置的
  5. const triggerAsyncId = getDefaultTriggerAsyncId();
  6. const tickObject = {
  7. [async_id_symbol]: asyncId,
  8. [trigger_async_id_symbol]: triggerAsyncId,
  9. callback,
  10. args
  11. };
  12. if (initHooksExist())
  13. // 创建了新的资源,触发init钩子
  14. emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);
  15. queue.push(tickObject);
  16. }

我们看到在nextTick中通过getDefaultTriggerAsyncId拿到了trigger async id。

  1. function getDefaultTriggerAsyncId() {
  2. const defaultTriggerAsyncId = async_id_fields[kDefaultTriggerAsyncId];
  3. if (defaultTriggerAsyncId < 0)
  4. return async_id_fields[kExecutionAsyncId];
  5. return defaultTriggerAsyncId;
  6. }

getDefaultTriggerAsyncId返回的就是刚才通过defaultTriggerAsyncIdScope设置的async id。所以在触发TickObject的init钩子时用户就可以拿到对应的id。不过更重要的时,在异步执行nextTick的任务时,还可以拿到原始的trigger async id。因为该id记录在tickObject中。我们看看执行tick任务时的逻辑。

  1. function processTicksAndRejections() {
  2. let tock;
  3. do {
  4. while (tock = queue.shift()) {
  5. // 拿到对应的async 上下文
  6. const asyncId = tock[async_id_symbol];
  7. emitBefore(asyncId, tock[trigger_async_id_symbol]);
  8. try {
  9. const callback = tock.callback;
  10. callback();
  11. } finally {
  12. if (destroyHooksExist())
  13. emitDestroy(asyncId);
  14. }
  15. emitAfter(asyncId);
  16. }
  17. } while (!queue.isEmpty() || processPromiseRejections());
  18. }

5 资源销毁

资源销毁的时候也会触发对应的钩子,不过不同的是这个钩子是异步触发的。无论是JS还是好C++层触发销毁钩子的时候,逻辑都是一致的。

  1. void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
  2. // 之前为空则设置回调
  3. if (env->destroy_async_id_list()->empty()) {
  4. env->SetUnrefImmediate(&DestroyAsyncIdsCallback);
  5. }
  6. // async id入队
  7. env->destroy_async_id_list()->push_back(async_id);
  8. }
  9. template <typename Fn>
  10. void Environment::SetUnrefImmediate(Fn&& cb) {
  11. CreateImmediate(std::move(cb), false);
  12. }
  13. template <typename Fn>
  14. void Environment::CreateImmediate(Fn&& cb, bool ref) {
  15. auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
  16. std::move(cb), ref);
  17. // 加入任务队列
  18. native_immediates_.Push(std::move(callback));
  19. }

在事件循环的check阶段就会执行里面的任务,从而执行回调DestroyAsyncIdsCallback。

  1. void AsyncWrap::DestroyAsyncIdsCallback(Environment* env) {
  2. Local<Function> fn = env->async_hooks_destroy_function();
  3. do {
  4. std::vector<double> destroy_async_id_list;
  5. destroy_async_id_list.swap(*env->destroy_async_id_list());
  6. // 遍历销毁的async id
  7. for (auto async_id : destroy_async_id_list) {
  8. HandleScope scope(env->isolate());
  9. Local<Value> async_id_value = Number::New(env->isolate(), async_id);
  10. // 执行JS层回调
  11. MaybeLocal<Value> ret = fn->Call(env->context(), Undefined(env->isolate()), 1, &async_id_value);
  12. }
  13. } while (!env->destroy_async_id_list()->empty());
  14. }

6 Async hooks的使用

我们通常以以下方式使用Async hooks

  1. const async_hooks = require('async_hooks');
  2. async_hooks.createHook({
  3. init(asyncId, type, triggerAsyncId) {},
  4. before(asyncId) {},
  5. after(asyncId) {},
  6. destroy(asyncId) {},
  7. promiseResolve(asyncId),
  8. })
  9. .enable();

async_hooks是对资源生命周期的抽象,资源就是操作对象和回调的抽象。async_hooks定义了五个生命周期钩子,当资源的状态到达某个周期节点时,async_hooks就会触发对应的钩子。下面我们看一下具体的实现。我们首先看一下createHook。

  1. function createHook(fns) {
  2. return new AsyncHook(fns);
  3. }

createHook是对AsyncHook的封装

  1. class AsyncHook {
  2. constructor({ init, before, after, destroy, promiseResolve }) {
  3. // 记录回调
  4. this[init_symbol] = init;
  5. this[before_symbol] = before;
  6. this[after_symbol] = after;
  7. this[destroy_symbol] = destroy;
  8. this[promise_resolve_symbol] = promiseResolve;
  9. }
  10. }

AsyncHook的初始化很简单,创建一个AsyncHook对象记录回调函数。创建了AsyncHook之后,我们需要调用AsyncHook的enable函数手动开启。

  1. class AsyncHook {
  2. enable() {
  3. // 获取一个AsyncHook对象数组和一个整形数组
  4. const [hooks_array, hook_fields] = getHookArrays();
  5. // 执行过enable了则不需要再执行
  6. if (hooks_array.includes(this))
  7. return this;
  8. // 做些统计
  9. const prev_kTotals = hook_fields[kTotals];
  10. hook_fields[kTotals] = hook_fields[kInit] += +!!this[init_symbol];
  11. hook_fields[kTotals] += hook_fields[kBefore] += +!!this[before_symbol];
  12. hook_fields[kTotals] += hook_fields[kAfter] += +!!this[after_symbol];
  13. hook_fields[kTotals] += hook_fields[kDestroy] += +!!this[destroy_symbol];
  14. hook_fields[kTotals] +=
  15. hook_fields[kPromiseResolve] += +!!this[promise_resolve_symbol];
  16. // 当前对象插入数组中
  17. hooks_array.push(this);
  18. // 如果之前的数量是0,本次操作后大于0则开启底层的逻辑
  19. if (prev_kTotals === 0 && hook_fields[kTotals] > 0) {
  20. enableHooks();
  21. }
  22. return this;
  23. }
  24. }

1 hooks_array:是一个AsyncHook对象数组,主要用于记录用户创建了哪些AsyncHook对象,然后哪些AsyncHook对象里都设置了哪些钩子,在回调的时候就会遍历这个对象数组,执行里面的回调。
2 hook_fields:对应底层的async_hook_fields。
3 enableHooks:

  1. function enableHooks() {
  2. // 记录async_hooks的开启个数
  3. async_hook_fields[kCheck] += 1;
  4. }

至此,async_hooks的初始化就完成了,我们发现逻辑非常简单。下面我们看一下他是如何串起来的。下面我们以TCP模块为例。

  1. const { createHook, executionAsyncId } = require('async_hooks');
  2. const fs = require('fs');
  3. const net = require('net');
  4. createHook({
  5. init(asyncId, type, triggerAsyncId) {
  6. fs.writeSync(
  7. 1,
  8. `${type}(${asyncId}): trigger: ${triggerAsyncId} execution: ${executionAsyncId()}\n`);
  9. }
  10. }).enable();
  11. net.createServer((conn) => {}).listen(8080);

以上代码输出

  1. init: type: TCPSERVERWRAP asyncId: 2 trigger id: 1 executionAsyncId(): 1 triggerAsyncId(): 0
  2. init: type: TickObject asyncId: 3 trigger id: 2 executionAsyncId(): 1 triggerAsyncId(): 0
  3. before: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2
  4. after: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2

下面我们来分析具体过程。我们知道创建资源的时候会执行init回调,具体逻辑在listen函数中,在listen函数中,通过层层调用会执行new TCP新建一个对象,表示服务器。TCP是C++层导出的类,刚才我们说过,TCP会继承AsyncWrap,新建AsyncWrap对象的时候会触发init钩子,结构图如下。
23-Async hooks - 图2
对应输出

  1. init: type: TCPSERVERWRAP asyncId: 2 trigger id: 1 executionAsyncId(): 1 triggerAsyncId(): 0

那TickObject是怎么来的呢?我们接着看listen里的另一段逻辑。

  1. this[async_id_symbol] = getNewAsyncId(this._handle);
  2. defaultTriggerAsyncIdScope(this[async_id_symbol],
  3. process.nextTick,
  4. emitListeningNT,
  5. this);

上面的代码我们刚才已经分析过,在执行process.nextTick的时候会创建一个TickObject对象封装执行上下文和回调。

  1. const asyncId = newAsyncId();
  2. const triggerAsyncId = getDefaultTriggerAsyncId();
  3. const tickObject = {
  4. [async_id_symbol]: asyncId,
  5. [trigger_async_id_symbol]: triggerAsyncId,
  6. callback,
  7. args
  8. };
  9. emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);

这次再次触发了init钩子,结构如下(nextTick通过getDefaultTriggerAsyncId获取的id是defaultTriggerAsyncIdScope设置的id)。
23-Async hooks - 图3
对应输出

  1. init: type: TickObject asyncId: 3 trigger id: 2 executionAsyncId(): 1 triggerAsyncId(): 0

接着执行tick任务。

  1. const asyncId = tock[async_id_symbol];
  2. emitBefore(asyncId, tock[trigger_async_id_symbol]);
  3. try {
  4. tock.callback();
  5. } finally {
  6. if (destroyHooksExist())
  7. emitDestroy(asyncId);
  8. }
  9. emitAfter(asyncId);

emitBefore时,结构图如下。
23-Async hooks - 图4
对应输出

  1. before: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2
  2. after: asyncId: 3 executionAsyncId(): 3 triggerAsyncId(): 2

执行完我们的JS代码后,所有入栈的上下文都会被清空,结构图如下。
23-Async hooks - 图5
如果这时候有一个连接建立会输出什么呢?当有连接建立时,会执行C++层的OnConnection。
OnConnection会创建一个新的TCP对象表示和客户端通信的对象。

  1. MaybeLocal<Object> TCPWrap::Instantiate(Environment* env,
  2. AsyncWrap* parent,
  3. TCPWrap::SocketType type) {
  4. EscapableHandleScope handle_scope(env->isolate());
  5. AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(parent);
  6. return handle_scope.EscapeMaybe(
  7. constructor->NewInstance(env->context(), 1, &type_value));
  8. }

首先定义了一个AsyncHooks::DefaultTriggerAsyncIdScope。DefaultTriggerAsyncIdScope用于设置默认default_trigger_async_id为parent的async id(值是2),执行Instantiate时会执行析构函数恢复原来状态。接着NewInstance的时候就会新建一个TCPWrap对象,从而创建一个AsyncWrap对象。然后触发init钩子,结构图如下。
23-Async hooks - 图6
对应输出

  1. init: type: TCPWRAP asyncId: 4 trigger id: 2 executionAsyncId(): 0 triggerAsyncId(): 0

创建完对象后,通过AsyncWrap::MakeCallback回调JS层,刚才我们已经分析过AsyncWrap::MakeCallback会触发before和after钩子,触发before钩子时,结构图如下。
23-Async hooks - 图7
对应输出

  1. before: asyncId: 2 executionAsyncId(): 2 triggerAsyncId(): 1

同样,在回调函数里执行executionAsyncId和triggerAsyncId拿到的内容是一样的。触发after后再恢复上下文,所以输出也是一样的。

  1. after: asyncId: 2 executionAsyncId(): 2 triggerAsyncId(): 1

7 AsyncResource

异步资源并不是Node.js内置的,Node.js只是提供了一套机制,业务层也可以使用。Node.js也提供了一个类给业务使用,核心代码如下。

  1. class AsyncResource {
  2. constructor(type, opts = {}) {
  3. let triggerAsyncId = opts;
  4. let requireManualDestroy = false;
  5. if (typeof opts !== 'number') {
  6. triggerAsyncId = opts.triggerAsyncId === undefined ?
  7. getDefaultTriggerAsyncId() : opts.triggerAsyncId;
  8. requireManualDestroy = !!opts.requireManualDestroy;
  9. }
  10. const asyncId = newAsyncId();
  11. this[async_id_symbol] = asyncId;
  12. this[trigger_async_id_symbol] = triggerAsyncId;
  13. if (initHooksExist()) {
  14. emitInit(asyncId, type, triggerAsyncId, this);
  15. }
  16. }
  17. runInAsyncScope(fn, thisArg, ...args) {
  18. const asyncId = this[async_id_symbol];
  19. emitBefore(asyncId, this[trigger_async_id_symbol]);
  20. const ret = thisArg === undefined ?
  21. fn(...args) :
  22. ReflectApply(fn, thisArg, args);
  23. emitAfter(asyncId);
  24. return ret;
  25. }
  26. emitDestroy() {
  27. if (this[destroyedSymbol] !== undefined) {
  28. this[destroyedSymbol].destroyed = true;
  29. }
  30. emitDestroy(this[async_id_symbol]);
  31. return this;
  32. }
  33. asyncId() {
  34. return this[async_id_symbol];
  35. }
  36. triggerAsyncId() {
  37. return this[trigger_async_id_symbol];
  38. }
  39. }

使用方式如下。

  1. const { AsyncResource, executionAsyncId,triggerAsyncId } = require('async_hooks');
  2. const asyncResource = new AsyncResource('Demo');
  3. asyncResource.runInAsyncScope(() => {
  4. console.log(executionAsyncId(), triggerAsyncId())
  5. });

runInAsyncScope中会把asyncResource的执行上下文设置为当前执行上下文,async id是2,trigger async id是1,所以在回调里执行executionAsyncId输出的是2,triggerAsyncId输出的是1。

8 AsyncLocalStorage

AsyncLocalStorage是基于AsyncResource实现的一个维护异步逻辑中公共上下文的类。我们可以把他理解为Redis。我们看一下怎么使用。

8.1 使用

  1. const { AsyncLocalStorage } = require('async_hooks');
  2. const asyncLocalStorage = new AsyncLocalStorage();
  3. function logWithId(msg) {
  4. const id = asyncLocalStorage.getStore();
  5. console.log(`${id !== undefined ? id : '-'}:`, msg);
  6. }
  7. asyncLocalStorage.run(1, () => {
  8. logWithId('start');
  9. setImmediate(() => {
  10. logWithId('finish');
  11. });
  12. });

执行上面代码会输出

  1. 1: start
  2. 1: finish

run的时候初始化公共的上下文,然后在run里执行的异步代码也可以拿得到这个公共上下文,这个在记录日志traceId时就会很有用,否则我们就需要把traceId传遍代码每个需要的地方。下面我们看一下实现。

8.2 实现

我们先看一下创建AsyncLocalStorage的逻辑

  1. class AsyncLocalStorage {
  2. constructor() {
  3. this.kResourceStore = Symbol('kResourceStore');
  4. this.enabled = false;
  5. }
  6. }

创建AsyncLocalStorage的时候很简单,主要是置状态为false,并且设置kResourceStore的值为Symbol(‘kResourceStore’)。设置为Symbol(‘kResourceStore’)而不是‘kResourceStore‘很重要,我们后面会看到。继续看一下执行AsyncLocalStorage.run的逻辑。

  1. run(store, callback, ...args) {
  2. // 新建一个AsyncResource
  3. const resource = new AsyncResource('AsyncLocalStorage', defaultAlsResourceOpts);
  4. // 通过runInAsyncScope把resource的执行上下文设置完当前的执行上下文
  5. return resource.emitDestroy().runInAsyncScope(() => {
  6. this.enterWith(store);
  7. return ReflectApply(callback, null, args);
  8. });
  9. }

设置完上下文之后执行runInAsyncScope的回调,回调里首先执行里enterWith。

  1. enterWith(store) {
  2. // 修改AsyncLocalStorage状态
  3. this._enable();
  4. // 获得当前执行上下文对于多资源,也就是run里创建的resource
  5. const resource = executionAsyncResource();
  6. // 把公共上下文挂载到对象上
  7. resource[this.kResourceStore] = store;
  8. }
  9. _enable() {
  10. if (!this.enabled) {
  11. this.enabled = true;
  12. ArrayPrototypePush(storageList, this);
  13. storageHook.enable();
  14. }
  15. }

挂载完公共上下文后,就执行业务回调。回调里可以通过asyncLocalStorage.getStore()获得设置的公共上下文。

  1. getStore() {
  2. if(this.enabled) {
  3. const resource = executionAsyncResource();
  4. return resource[this.kResourceStore];
  5. }
  6. }

getStore的原理很简单,就是首先拿到当前执行上下文对应的资源,然后根据AsyncLocalStorage的kResourceStore的值从resource中拿到公共上下文。如果是同步执行getStore,那么executionAsyncResource返回的就是我们在run的时候创建的AsyncResource,但是如果是异步getStore那么怎么办呢?因为这时候executionAsyncResource返回的不再是我们创建的AsyncResource,也就拿不到他挂载的公共上下文。为了解决这个问题,Node.js对公共上下文进行了传递。

  1. const storageList = []; // AsyncLocalStorage对象数组
  2. const storageHook = createHook({
  3. init(asyncId, type, triggerAsyncId, resource) {
  4. const currentResource = executionAsyncResource();
  5. for (let i = 0; i < storageList.length; ++i) {
  6. storageList[i]._propagate(resource, currentResource);
  7. }
  8. }
  9. });
  10. _propagate(resource, triggerResource) {
  11. const store = triggerResource[this.kResourceStore];
  12. if (this.enabled) {
  13. resource[this.kResourceStore] = store;
  14. }
  15. }

我们看到Node.js内部创建了一个Hooks,在每次资源创建的时候,Node.js会把当前执行上下文对应的资源中的一个或多个key(根据storageList里对象的this.kResourceStore字段)对应的值挂载到新创建的资源中。所以在asyncLocalStorage.getStore()时即使不是我们在执行run时创建的资源对象,也可以获得具体asyncLocalStorage对象所设置的资源,我们再来看一个例子。

  1. const { AsyncLocalStorage } = require('async_hooks');
  2. const asyncLocalStorage = new AsyncLocalStorage();
  3. const asyncLocalStorage2 = new AsyncLocalStorage();
  4. function logWithId(msg) {
  5. console.log(asyncLocalStorage2.getStore());
  6. const id = asyncLocalStorage.getStore();
  7. console.log(`${id !== undefined ? id : '-'}:`, msg);
  8. }
  9. asyncLocalStorage.run(0, () => {
  10. asyncLocalStorage2.enterWith({hello: "world"});
  11. logWithId('start');
  12. setImmediate(() => {
  13. logWithId('finish');
  14. });
  15. });

除了通过asyncLocalStorage.run设置上下文,我们通过asyncLocalStorage2.enterWith也给对象上下文的资源对象挂载一个新属性,key是Symbol(‘kResourceStore’),值是{hello: “world”},然后在logWithId中输出asyncLocalStorage2.getStore()。从输出中可以看到成功从资源中获得挂载的所有上下文。

  1. { hello: 'world' }
  2. 0: start
  3. { hello: 'world' }
  4. 0: finish

我们也可以修改源码验证

  1. Immediate {
  2. _idleNext: null,
  3. _idlePrev: null,
  4. _onImmediate: [Function (anonymous)],
  5. _argv: undefined,
  6. _destroyed: true,
  7. [Symbol(refed)]: null,
  8. [Symbol(asyncId)]: 6,
  9. [Symbol(triggerId)]: 2,
  10. [Symbol(kResourceStore)]: 0,
  11. [Symbol(kResourceStore)]: { hello: 'world' }
  12. }

可以看到资源对象挂载里两个key为Symbol(kResourceStore)的属性。

9 初始化时的Async hooks

  1. const async_hooks = require('async_hooks');
  2. const eid = async_hooks.executionAsyncId();
  3. const tid = async_hooks.triggerAsyncId();
  4. console.log(eid, tid);

以上代码中,输出1和0。对应的API实现如下。

  1. // 获取当前的async id
  2. function executionAsyncId() {
  3. return async_id_fields[kExecutionAsyncId];
  4. }
  5. // 获取当前的trigger async id,即触发当前代码的async id
  6. function triggerAsyncId() {
  7. return async_id_fields[kTriggerAsyncId];
  8. }

那么asyncid_fields的初始化是什么呢?从env.h定义中可以看到async_id_fields(asyncid_fields是上层使用的名称,对应底层的async_id_fields)是AliasedFloat64Array类型。

  1. AliasedFloat64Array async_id_fields_;

AliasedFloat64Array是个类型别名。

  1. typedef AliasedBufferBase<double, v8::Float64Array> AliasedFloat64Array;

AliasedBufferBase的构造函数如下

  1. AliasedBufferBase(v8::Isolate* isolate, const size_t count)
  2. : isolate_(isolate), count_(count), byte_offset_(0) {
  3. const v8::HandleScope handle_scope(isolate_);
  4. const size_t size_in_bytes = MultiplyWithOverflowCheck(sizeof(NativeT), count);
  5. v8::Local<v8::ArrayBuffer> ab = v8::ArrayBuffer::New(isolate_, size_in_bytes);
  6. // ...
  7. }

底层是一个ArrayBuffer。

  1. Local<ArrayBuffer> v8::ArrayBuffer::New(Isolate* isolate, size_t byte_length) {
  2. i::Isolate* i_isolate = reinterpret_cast<i::Isolate*>(isolate);
  3. LOG_API(i_isolate, ArrayBuffer, New);
  4. ENTER_V8_NO_SCRIPT_NO_EXCEPTION(i_isolate);
  5. i::MaybeHandle<i::JSArrayBuffer> result =
  6. i_isolate->factory()->NewJSArrayBufferAndBackingStore(
  7. byte_length, i::InitializedFlag::kZeroInitialized);
  8. // ...
  9. }

ArrayBuffer::New在申请内存时传入了i::InitializedFlag::kZeroInitialized。从V8定义中可以看到会初始化内存的内容为0。

  1. // Whether the backing store memory is initialied to zero or not.
  2. enum class InitializedFlag : uint8_t { kUninitialized, kZeroInitialized };

回到例子中,为什么输出会是1和0而不是0和0呢?答案在Node.js启动时的这段代码。

  1. {
  2. InternalCallbackScope callback_scope(
  3. env.get(),
  4. Local<Object>(),
  5. // async id和trigger async id
  6. { 1, 0 },
  7. InternalCallbackScope::kAllowEmptyResource |
  8. InternalCallbackScope::kSkipAsyncHooks);
  9. // 执行我们的js
  10. LoadEnvironment(env.get());
  11. }

InternalCallbackScope刚才已经分析过,他会把1和0设置为当前的执行上下文。然后在LoadEnvironment里执行我的JS代码时获取到的值就是1和0。那么如果我们改成以下代码会输出什么呢?

  1. const async_hooks = require('async_hooks');
  2. Promise.resolve().then(() => {
  3. const eid = async_hooks.executionAsyncId();
  4. const tid = async_hooks.triggerAsyncId();
  5. console.log(eid, tid);
  6. })

以上代码会输出0和。因为执行完我们的JS代码后,InternalCallbackScope就被析构了,从而恢复为0和0。