Callback (回调)模式
JS 适合与回调搭配的原因:
- 函数在 JS 中属于头等对象(first-class-object),可以把它赋给某个变量或传给某个参数,也可以从函数中返回一个函数,或是将其保存到某种数据结构中
- 闭包(closure):通过闭包可以引用某个函数在刚刚创建时候所处的那套环境,即可以把程序请求执行异步操作时所处的情景(context,上下文)保留起来,无论系统在什么时间和场合触发回调,都能得知程序当初发起这项异步操作时的情景。
continuation-passing pattern (连续传递模式)
在 JS 中回调就是传入作为参数传入另外一个函数中的函数,并且在操作完成后调用。在函数式编程中,这种传递结果的方式被称为 continuation-passing style(CPS)。这是个一般概念,并不是针对异步操作。实际上,它只是通过将结果作为参数传递给另外一个函数(回调函数)来传递结果,然后在主体逻辑中调用回调函数拿到操作结果,而不是直接将其返回给调用者。
synchronous continuation-passing style(同步连续传递风格)
在回调函数执行完毕才返回值
function add(a, b, callback) {
callback(a + b)
}
asynchronous continuation-passing style(异步连续传递风格)
function additionAsync(a, b, callback) {
setTimeout(() => callback(a + b), 100)
}
console.log('before');
additionAsync(1, 2, result => console.log(`result: ${result}`));
console.log('after');
setTimeout() 触发一个异步操作,不需要等待回调函数执行完就会返回 additionAsync() 的控制权,然后再回到执行 additionAsync 的调用者。
当一个异步的请求发出后会立即回到事件循环中,因而允许队列中新的事件被处理。
Synchronous or asynchronous?(同步还是异步)
代码的执行顺序会因同步或异步的执行方式产生根本性的改变。
总的来说,API 在同步或异步上,必须一致,要么把整个函数设计成纯碎的同步函数,要么全采用异步方式来执行,而不能混用这两种范式,那样会导致许多难以排查而且难以重现的问题。
An unpredictable function(一个不可预测的函数)
最危险的情况之一就是使一个 API 在某种特定情况下是同步执行的但是在另一种情况却是异步执行的:
import { readFile } from "fs";
const cache = new Map();
function inconsistentRead(filename, cb) {
if (cache.has(filename)) {
// 同步执行
cb(cache.get(filename));
} else {
// 异步函数
readFile(filename, 'utf8', (err, data) => {
cache.set(filename, data);
cb(data);
})
}
}
function createFileReader(filename) {
const listeners = []
inconsistentRead(filename, value => {
listeners.forEach(listener => listener(value))
})
return {
onDataReady: listener => listeners.push(listener)
}
}
const reader1 = createFileReader('data.txt')
reader1.onDataReady(data => {
console.log('First call data: ' + data)
// 之后再次通过fs读取同一个文件
const reader2 = createFileReader('data.txt')
reader2.onDataReady(data => {
console.log('Second call data: ' + data)
})
})
结果:
First call data: some data
- 创建 reader1 的时候所触发的这次 inconsistentRead 调用,会以异步方式执行,因为此时缓存中没有内容,该函数需要以异步的方式读取文件内容,并执行回调。这意味着,如果执行回调的时候,用户已经通过 onDataReady 向 listeners 里面注册了监听器,那么这些监听器就会在事件循环的某个周期里面触发。即用户在这种情况下,有足够的事件在系统通知 listeners 之前,先把监听器注册进去
- 创建 reader2 时触发的 inconsistentRead 因为存在缓存,所以以同步方式执行,这时会直接触发回调,这意味着它只能通知此时已经位于 listeners 中的那些监听器。而用户是在拿到 reader2 之后,才通过 onDataReady 注册监听器的,所以没有机会赶在 inconsistentRead 触发这些监听器之前就将其注册好。
使用同步 API
```typescript import { readFileSync } from “fs”;
const cache = new Map();
function consistentReadSync(filename) { if (cache.has(filename)) { return cache.get(filename); } else { const data = readFileSync(filename, ‘utf8’); cache.set(filename, data); return data; } }
function createFileReader(filename) { const listeners = [] return { onDataReady: listener => { listeners.push(listener); let data = consistentReadSync(filename); listeners.forEach(listener => listener(data)); } } }
使用同步 API 替代异步 API 存在的问题:
- 某些特定的功能,或许没有同步版的 API
- 同步 API 会阻塞事件循环,把同时出现的其他请求给卡住,这会打破 Node.js 的并发模型,降低应用性能
<a name="EpPBE"></a>
### 通过延迟执行机制(deffered execution)确保异步执行
```typescript
import { readFile } from "fs";
const cache = new Map();
function consistentReadAsync (filename, cb) {
if (cache.has(filename)) {
// 推迟回调的执行时机
process.nextTick(() => cb(cache.get(filename)));
} else {
// 异步函数
readFile(filename, 'utf8', (err, data) => {
cache.set(filename, data);
cb(data);
})
}
}
function createFileReader(filename) {
const listeners = []
consistentReadAsync(filename, value => {
listeners.forEach(listener => listener(value))
})
return {
onDataReady: listener => listeners.push(listener)
}
}
process.nextTick() 产生的是微任务(microtask),会在任何 I/O 事件触发前执行,而 setImmediate() 在队列中任何的 I/O 事件执行之后执行
Node.js callback conventions(定义回调的习惯)
callbacks come last(回调函数在最后)
fs.readFile(filename, [options], callback);
Error comes first (错误处理放在最前)
编写回调逻辑时,总是应该先判断这次回调有没有出错,有利于 debug 发现问题
fs.readFile('foo.txt', 'utf8', (err, data) => {
if (err) handleError(err)
else processData(data)
})
propagating errors (传递错误)
在以 CPS (连续传递风格)编写异步函数时,比较好的错误处理方式是将错误传递到回调链中的下一个回调函数中
import { readFile } from "fs";
function readJSON (filename, callback) {
readFile(filename, 'utf8', (err, data) => {
let parsed;
if (err) {
// 播报错误并退出当前函数
return callback(err);
}
try {
// 解析文件内容
parsed = JSON.parse(data);
} catch (err) {
// 捕获解析时的错误
return callback(err);
}
// 没有出错,因此只播报解析好的数据即可
callback(null, parsed);
})
}
uncaught exceptions(未捕获的异常)
在异步回调过程中,错误是难以被捕获的
const fs = require('fs')
function readJSONThrows(filename, callback) {
fs.readFile(filename, 'utf8', (err, data) => {
if (err) {
return callback(err)
}
callback(null, JSON.parse(data))
})
}
在上面的函数中,如果JSON.parse(data)异常的话是没有办法捕获的,这个异常从回调函数开始,沿着调用栈向上传播,并且直接传播到事件循环,事件循环会把自己看到的这个异常打印到控制台,令程序突然终止。
try {
readJSONThrows('nonJSON.txt', function(err, result) {
// ...
})
} catch (err) {
console.log('This will not catch the JSON parsing exception')
}
上面catch语句将捕获不到错误,因为错误是在回调函数中产生的。然而,我们仍然有机会在应用程序终止之前执行一些清理或日志记录。事实上,当这种情况发生时,Node.js 会在退出进程之前发出一个名为uncaughtException的特殊事件
process.on('uncaughtException', err => {
console.error(
'This will catch at last the ' + 'JSON parsing exception: ' + err.message
)
// Terminates the application with 1 (error) as exit code:
// without the following line, the application would continue
process.exit(1)
})
需要注意的是,uncaughtException 会使得应用处于一个不能保证一致的状态,而这可能导致不可预见的错误。比如还有未完成的 I/O 请求正在运行或关闭,这可能导致不一致。所以建议,尤其是在生产环境,在收到任何 uncaught exception 之后停止应用的运行
Observer pattern(观察者模式)
Observer 模式定义了一个对象(subject),它会在状态改变的时候通知一组观察者(监听器)
和回调的主要区别:Observer 模式可以通知多个监听器,而采用 CPS (连续传递风格)所实现的普通 Callback 模式,通常只会把执行结果传给一个监听器,也就是用户在提交执行请求时传入的那个回调。
EventEmitter
这个类允许开发者把一个或多个函数注册成监听器,并在某种事件发生时触发
import { EventEmitter } from 'events';
const emitter = new EventEmitter();
- on(event, listener):可以为某种事件注册一个新的监听器
- once(event, listener):也是为一个事件注册监听器,但是在事件第一次被触发后监听器被移除。
- emit(event, [arg1], […]):生成一个新事件,并附带参数给监听器。
- removeListener(event, listener):删除指定事件的一个监听器
这些方法都会返回 EventEmitter 实例,可以链式调用
creating and using eventEmitter
import { EventEmitter } from 'events';
import { readFile } from 'fs';
function findRegex (files, regex) {
const emitter = new EventEmitter();
for (const file of files) {
readFile(file, 'utf8', (err, content) => {
if (err) {
return emitter.emit('error', err);
}
emitter.emit('fileread', file);
const match = content.match(regex);
if (match) {
match.forEach(elem => emitter.emit('found', file, elem));
}
})
}
return emitter;
}
findRegex(
['fileA.txt', 'fileB.json'],
/hello/g
).on('fileread', file => console.log(`${file} was read`))
.on('found', (file, match) => console.log(`matched ${match} in ${file}`))
.on('error', err => console.error(`error emitted ${err.message}`));
播报错误信息
EventEmitter 会用特殊的方式对待 error 事件,如果它发出这样的一个事件,却没有找到对应的事件监听器,会自动抛出异常,导致应用程序退出,所以推荐要为 error 事件注册监听器。
making any object observable(使任意对象可观察)
扩展 EventEmitter
import { EventEmitter } from 'events';
import { readFile } from 'fs';
class FindRegex extends EventEmitter {
constructor (regex) {
super();
this.regex = regex;
this.files = [];
}
addFile (file) {
this.files.push(file);
return this;
}
find () {
for (const file of this.files) {
readFile(file, 'utf8', (err, content) => {
if (err) {
return this.emit('error', err);
}
this.emit('fileread', file);
const match = content.match(this.regex);
if (match) {
match.forEach(elem => this.emit('found', file, elem));
}
})
}
return this;
}
}
const findRegexInstance = new FindRegex(/hello/g);
findRegexInstance
.addFile('fileA.txt')
.addFile('fileB.json')
.find()
.on('fileread', file => console.log(`${file} was read`))
.on('found', (file, match) => console.log(`matched ${match} in ${file}`))
.on('error', err => console.error(`error emitted ${err.message}`));
EventEmitter 与内存泄漏
在用不到监听器时及时取消订阅(unsubscribe),防止内存泄漏(memory leak)。
未能及时释放 EventEmitter 的监听器,是 Node.js 平台发生内存泄漏的主要原因。
synchronous and asynchronous events
将 find () 任务改用同步方式触发
find () {
for (const file of this.files) {
let content;
try {
content = readFileSync(file, 'utf8');
} catch (err) {
this.emit('error', err);
}
this.emit('fileread', file);
const match = content.match(this.regex);
if (match) {
match.forEach(elem => this.emit('found', file, elem));
}
}
return this;
}