events模块是Node.js中比较简单但是却非常核心的模块,Node.js中,很多模块都继承于events模块,events模块是发布、订阅模式的实现。我们首先看一个如果使用events模块。

  1. const { EventEmitter } = require('events');
  2. class Events extends EventEmitter {}
  3. const events = new Events();
  4. events.on('demo', () => {
  5. console.log('emit demo event');
  6. });
  7. events.emit('demo');

接下来我们看一下events模块的具体实现。

22.1 初始化

当new一个EventEmitter或者他的子类时,就会进入EventEmitter的逻辑。

  1. function EventEmitter(opts) {
  2. EventEmitter.init.call(this, opts);
  3. }
  4. EventEmitter.init = function(opts) {
  5. // 如果是未初始化或者没有自定义_events,则初始化
  6. if (this._events === undefined ||
  7. this._events === ObjectGetPrototypeOf(this)._events) {
  8. this._events = ObjectCreate(null);
  9. this._eventsCount = 0;
  10. }
  11. // 初始化处理函数个数的阈值
  12. this._maxListeners = this._maxListeners || undefined;
  13. // 是否开启捕获promise reject,默认false
  14. if (opts && opts.captureRejections) {
  15. this[kCapture] = Boolean(opts.captureRejections);
  16. } else {
  17. this[kCapture] = EventEmitter.prototype[kCapture];
  18. }
  19. };

EventEmitter的初始化主要是初始化了一些数据结构和属性。唯一支持的一个参数就是captureRejections,captureRejections表示当触发事件,执行处理函数时,EventEmitter是否捕获处理函数中的异常。后面我们会详细讲解。

22.2 订阅事件

初始化完EventEmitter之后,我们就可以开始使用订阅、发布的功能。我们可以通过addListener、prependListener、on、once订阅事件。addListener和on是等价的,prependListener的区别在于处理函数会被插入到队首,而默认是追加到队尾。once注册的处理函数,最多被执行一次。四个API都是通过_addListener函数实现的。下面我们看一下具体实现。

  1. function _addListener(target, type, listener, prepend) {
  2. let m;
  3. let events;
  4. let existing;
  5. events = target._events;
  6. // 还没有初始化_events则初始化
  7. if (events === undefined) {
  8. events = target._events = ObjectCreate(null);
  9. target._eventsCount = 0;
  10. } else {
  11. /*
  12. 是否定义了newListener事件,是的话先触发,如果监听了newListener事件,
  13. 每次注册其他事件时都会触发newListener,相当于钩子
  14. */
  15. if (events.newListener !== undefined) {
  16. target.emit('newListener', type,
  17. listener.listener ? listener.listener : listener);
  18. // 可能会修改_events,这里重新赋值
  19. events = target._events;
  20. }
  21. // 判断是否已经存在处理函数
  22. existing = events[type];
  23. }
  24. // 不存在则以函数的形式存储,否则是数组
  25. if (existing === undefined) {
  26. events[type] = listener;
  27. ++target._eventsCount;
  28. } else {
  29. if (typeof existing === 'function') {
  30. existing = events[type] =
  31. prepend ? [listener, existing] : [existing, listener];
  32. } else if (prepend) {
  33. existing.unshift(listener);
  34. } else {
  35. existing.push(listener);
  36. }
  37. // 处理告警,处理函数过多可能是因为之前的没有删除,造成内存泄漏
  38. m = _getMaxListeners(target);
  39. if (m > 0 && existing.length > m && !existing.warned) {
  40. existing.warned = true;
  41. const w = new Error('Possible EventEmitter memory leak detected. ' +
  42. `${existing.length} ${String(type)} listeners ` +
  43. `added to ${inspect(target, { depth: -1 })}. Use ` +
  44. 'emitter.setMaxListeners() to increase limit');
  45. w.name = 'MaxListenersExceededWarning';
  46. w.emitter = target;
  47. w.type = type;
  48. w.count = existing.length;
  49. process.emitWarning(w);
  50. }
  51. }
  52. return target;
  53. }

接下来我们看一下once的实现,对比其他几种api,once的实现相对比较难,因为我们要控制处理函数最多执行一次,所以我们需要坚持用户定义的函数,保证在事件触发的时候,执行用户定义函数的同时,还需要删除注册的事件。

  1. EventEmitter.prototype.once = function once(type, listener) {
  2. this.on(type, _onceWrap(this, type, listener));
  3. return this;
  4. };
  5. function onceWrapper() {
  6. // 还没有触发过
  7. if (!this.fired) {
  8. // 删除他
  9. this.target.removeListener(this.type, this.wrapFn);
  10. // 触发了
  11. this.fired = true;
  12. // 执行
  13. if (arguments.length === 0)
  14. return this.listener.call(this.target);
  15. return this.listener.apply(this.target, arguments);
  16. }
  17. }
  18. // 支持once api
  19. function _onceWrap(target, type, listener) {
  20. // fired是否已执行处理函数,wrapFn包裹listener的函数
  21. const state = { fired: false, wrapFn: undefined, target, type, listener };
  22. // 生成一个包裹listener的函数
  23. const wrapped = onceWrapper.bind(state);
  24. // 把原函数listener也挂到包裹函数中,用于事件没有触发前,用户主动删除,见removeListener
  25. wrapped.listener = listener;
  26. // 保存包裹函数,用于执行完后删除,见onceWrapper
  27. state.wrapFn = wrapped;
  28. return wrapped;
  29. }

22.3 触发事件

分析完事件的订阅,接着我们看一下事件的触发。

  1. EventEmitter.prototype.emit = function emit(type, ...args) {
  2. // 触发的事件是否是error,error事件需要特殊处理
  3. let doError = (type === 'error');
  4. const events = this._events;
  5. // 定义了处理函数(不一定是type事件的处理函数)
  6. if (events !== undefined) {
  7. // 如果触发的事件是error,并且监听了kErrorMonitor事件则触发kErrorMonitor事件
  8. if (doError && events[kErrorMonitor] !== undefined)
  9. this.emit(kErrorMonitor, ...args);
  10. // 触发的是error事件但是没有定义处理函数
  11. doError = (doError && events.error === undefined);
  12. } else if (!doError) // 没有定义处理函数并且触发的不是error事件则不需要处理,
  13. return false;
  14. // If there is no 'error' event listener then throw.
  15. // 触发的是error事件,但是没有定义处理error事件的函数,则报错
  16. if (doError) {
  17. let er;
  18. if (args.length > 0)
  19. er = args[0];
  20. // 第一个入参是Error的实例
  21. if (er instanceof Error) {
  22. try {
  23. const capture = {};
  24. /*
  25. 给capture对象注入stack属性,stack的值是执行Error.captureStackTrace
  26. 语句的当前栈信息,但是不包括emit的部分
  27. */
  28. Error.captureStackTrace(capture, EventEmitter.prototype.emit);
  29. ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {
  30. value: enhanceStackTrace.bind(this, er, capture),
  31. configurable: true
  32. });
  33. } catch {}
  34. throw er; // Unhandled 'error' event
  35. }
  36. let stringifiedEr;
  37. const { inspect } = require('internal/util/inspect');
  38. try {
  39. stringifiedEr = inspect(er);
  40. } catch {
  41. stringifiedEr = er;
  42. }
  43. const err = new ERR_UNHANDLED_ERROR(stringifiedEr);
  44. err.context = er;
  45. throw err; // Unhandled 'error' event
  46. }
  47. // 获取type事件对应的处理函数
  48. const handler = events[type];
  49. // 没有则不处理
  50. if (handler === undefined)
  51. return false;
  52. // 等于函数说明只有一个
  53. if (typeof handler === 'function') {
  54. // 直接执行
  55. const result = ReflectApply(handler, this, args);
  56. // 非空判断是不是promise并且是否需要处理,见addCatch
  57. if (result !== undefined && result !== null) {
  58. addCatch(this, result, type, args);
  59. }
  60. } else {
  61. // 多个处理函数,同上
  62. const len = handler.length;
  63. const listeners = arrayClone(handler, len);
  64. for (let i = 0; i < len; ++i) {
  65. const result = ReflectApply(listeners[i], this, args);
  66. if (result !== undefined && result !== null) {
  67. addCatch(this, result, type, args);
  68. }
  69. }
  70. }
  71. return true;
  72. }

我们看到在Node.js中,对于error事件是特殊处理的,如果用户没有注册error事件的处理函数,可能会导致程序挂掉,另外我们看到有一个addCatch的逻辑,addCatch是为了支持事件处理函数为异步模式的情况,比如async函数或者返回Promise的函数。

  1. function addCatch(that, promise, type, args) {
  2. // 没有开启捕获则不需要处理
  3. if (!that[kCapture]) {
  4. return;
  5. }
  6. // that throws on second use.
  7. try {
  8. const then = promise.then;
  9. if (typeof then === 'function') {
  10. // 注册reject的处理函数
  11. then.call(promise, undefined, function(err) {
  12. process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
  13. });
  14. }
  15. } catch (err) {
  16. that.emit('error', err);
  17. }
  18. }
  19. function emitUnhandledRejectionOrErr(ee, err, type, args) {
  20. // 用户实现了kRejection则执行
  21. if (typeof ee[kRejection] === 'function') {
  22. ee[kRejection](err, type, ...args);
  23. } else {
  24. // 保存当前值
  25. const prev = ee[kCapture];
  26. try {
  27. /*
  28. 关闭然后触发error事件,意义
  29. 1 防止error事件处理函数也抛出error,导致死循环
  30. 2 如果用户处理了error,则进程不会退出,所以需要恢复kCapture的值
  31. 如果用户没有处理error,则nodejs会触发uncaughtException,如果用户
  32. 处理了uncaughtException则需要灰度kCapture的值
  33. */
  34. ee[kCapture] = false;
  35. ee.emit('error', err);
  36. } finally {
  37. ee[kCapture] = prev;
  38. }
  39. }
  40. }

22.4 取消订阅

我们接着看一下删除事件处理函数的逻辑。

  1. function removeAllListeners(type) {
  2. const events = this._events;
  3. if (events === undefined)
  4. return this;
  5. // 没有注册removeListener事件,则只需要删除数据,否则还需要触发removeListener事件
  6. if (events.removeListener === undefined) {
  7. // 等于0说明是删除全部
  8. if (arguments.length === 0) {
  9. this._events = ObjectCreate(null);
  10. this._eventsCount = 0;
  11. } else if (events[type] !== undefined) { // 否则是删除某个类型的事件,
  12. // 是唯一一个处理函数,则重置_events,否则删除对应的事件类型
  13. if (--this._eventsCount === 0)
  14. this._events = ObjectCreate(null);
  15. else
  16. delete events[type];
  17. }
  18. return this;
  19. }
  20. // 说明注册了removeListener事件,arguments.length === 0说明删除所有类型的事件
  21. if (arguments.length === 0) {
  22. // 逐个删除,除了removeListener事件,这里删除了非removeListener事件
  23. for (const key of ObjectKeys(events)) {
  24. if (key === 'removeListener') continue;
  25. this.removeAllListeners(key);
  26. }
  27. // 这里删除removeListener事件,见下面的逻辑
  28. this.removeAllListeners('removeListener');
  29. // 重置数据结构
  30. this._events = ObjectCreate(null);
  31. this._eventsCount = 0;
  32. return this;
  33. }
  34. // 删除某类型事件
  35. const listeners = events[type];
  36. if (typeof listeners === 'function') {
  37. this.removeListener(type, listeners);
  38. } else if (listeners !== undefined) {
  39. // LIFO order
  40. for (let i = listeners.length - 1; i >= 0; i--) {
  41. this.removeListener(type, listeners[i]);
  42. }
  43. }
  44. return this;
  45. }

removeAllListeners函数主要的逻辑有两点,第一个是removeListener事件需要特殊处理,这类似一个钩子,每次用户删除事件处理函数的时候都会触发该事件。第二是removeListener函数。removeListener是真正删除事件处理函数的实现。removeAllListeners是封装了removeListener的逻辑。

  1. function removeListener(type, listener) {
  2. let originalListener;
  3. const events = this._events;
  4. // 没有东西可删除
  5. if (events === undefined)
  6. return this;
  7. const list = events[type];
  8. // 同上
  9. if (list === undefined)
  10. return this;
  11. // list是函数说明只有一个处理函数,否则是数组,如果list.listener === listener说明是once注册的
  12. if (list === listener || list.listener === listener) {
  13. // type类型的处理函数就一个,并且也没有注册其他类型的事件,则初始化_events
  14. if (--this._eventsCount === 0)
  15. this._events = ObjectCreate(null);
  16. else {
  17. // 就一个执行完删除type对应的属性
  18. delete events[type];
  19. // 注册了removeListener事件,则先注册removeListener事件
  20. if (events.removeListener)
  21. this.emit('removeListener', type, list.listener || listener);
  22. }
  23. } else if (typeof list !== 'function') {
  24. // 多个处理函数
  25. let position = -1;
  26. // 找出需要删除的函数
  27. for (let i = list.length - 1; i >= 0; i--) {
  28. if (list[i] === listener || list[i].listener === listener) {
  29. // 保存原处理函数,如果有的话
  30. originalListener = list[i].listener;
  31. position = i;
  32. break;
  33. }
  34. }
  35. if (position < 0)
  36. return this;
  37. // 第一个则出队,否则删除一个
  38. if (position === 0)
  39. list.shift();
  40. else {
  41. if (spliceOne === undefined)
  42. spliceOne = require('internal/util').spliceOne;
  43. spliceOne(list, position);
  44. }
  45. // 如果只剩下一个,则值改成函数类型
  46. if (list.length === 1)
  47. events[type] = list[0];
  48. // 触发removeListener
  49. if (events.removeListener !== undefined)
  50. this.emit('removeListener', type, originalListener || listener);
  51. }
  52. return this;
  53. };

以上就是events模块的核心逻辑,另外还有一些工具函数就不一一分析。