异步生成器
生成器 + 回调函数
通过使用生成器,可以实现同步顺序流程表达异步,异步生成器函数的行为与生成器函数类似:生成器函数返回一个具有next()函数的对象,并且调用next()执行生成器函数直到下一个yield。 区别在于异步迭代器的next()函数返回一个promise。
使用回调的方式
function foo(x, y, cb) {
ajax(
"http://some.url.1/?x=" + x + "&y=" + y, cb
);
}
foo(11, 31, function(err, text) {
if (err) {
console.error(err);
} else {
console.log(text);
}
});
使用生成器改造回调
function foo(x, y) {
ajax(
"http://some.url.1/?x=" + x + "&y=" + y,
function(err, data) {
if (err) {
// 向*main()抛出一个错误
it.throw( err );
} else {
// 用收到的data恢复*main()
it.next(data);
}
});
}
function* main() {
try {
var text = yield foo(11, 31);
console.log(text);
} catch (err) {
console.error(err);
}
}
var it = main();
// 这里启动!
it.next();
生成器 + promise
- 最大问题是代码冗余,原来的任务被 Promise 包装了一下,不管什么操作,一眼看去都是一堆
then
,原来的语义变得很不清楚,代码不能很好地表示执行流程。 - 首先,无法取消
Promise
,一旦新建它就会立即执行,无法中途取消。 - 其次,如果不设置回调函数,
Promise
内部抛出的错误,不会反应到外部。 - 第三,当处于
pending
状态时,无法得知目前进展到哪一个阶段(刚刚开始还是即将完成)。promise 的版本
fetch('https://www.geekbang.org')
.then((response) => {
console.log(response)
return fetch('https://www.geekbang.org/test')
}).then((response) => {
console.log(response)
}).catch((error) => {
console.log(error)
})
生成器 + promise 的版本
相对于 promise 的版本有以下几个优点:
- then 里面就是异步的回调处理,执行流程清晰
- 可以控制开始,中途可以取消
- promise 发生的错误只能在promise.catch 中捕获,可以在生成中抛出错误
- 可以控制执行流程
问题:
异步生成器失去了Promise 的可信任性和可组合性
//foo 函数
function* foo() {
let response1 = yield fetch('https://www.geekbang.org')
console.log('response1')
console.log(response1)
let response2 = yield fetch('https://www.geekbang.org/test')
console.log('response2')
console.log(response2)
}
// 执行 foo 函数的代码
let gen = foo()
function getGenPromise(gen) {
return gen.next().value
}
getGenPromise(gen).then((response) => {
console.log('response1')
console.log(response)
return getGenPromise(gen)
}).then((response) => {
console.log('response2')
console.log(response)
})
- 首先执行的是
let gen = foo()
,创建了 gen 协程。 - 然后在父协程中通过执行 gen.next 把主线程的控制权交给 gen 协程。
- gen 协程获取到主线程的控制权后,就调用 fetch 函数创建了一个 Promise 对象 response1,然后通过 yield 暂停 gen 协程的执行,并将 response1 返回给父协程。
- 父协程恢复执行后,调用 response1.then 方法等待请求结果。
- 等通过 fetch 发起的请求完成之后,会调用 then 中的回调函数,then 中的回调函数拿到结果之后,通过调用 gen.next 放弃主线程的控制权,将控制权交 gen 协程继续执行下个请求。
虽然 Generator 函数将异步操作表示得很简洁,但是流程管理却不方便(即何时执行第一阶段、何时执行第二阶段)。
自动流程异步生成器
把上面说的执行生成器代码封装成一个函数,并把这个执行生成器代码的函数称为执行器
封装
function* foo() {
let response1 = yield fetch('https://www.geekbang.org')
console.log('response1')
console.log(response1)
let response2 = yield fetch('https://www.geekbang.org/test')
console.log('response2')
console.log(response2)
}
co(foo());
function run(gen) {
var args = [].slice.call(arguments, 1);
var it;
// 在当前上下文中初始化生成器
it = gen.apply(this, args);
// 返回一个promise用于生成器完成
return Promise.resolve()
.then(function handleNext(value) { // 对下一个yield出的值运行
var next = it.next(value);
return (function handleResult(next) { // 生成器运行完毕了吗?
if (next.done) {
return next.value;
}
// 否则继续运行
else {
return Promise.resolve(next.value).then(
// 成功就恢复异步循环,把决议的值发回生成器 handleNext,
// 如果value是被拒绝的 promise,
// 就把错误传回生成器进行出错处理 function handleErr(err) {
return Promise.resolve(it.throw(err))
.then(handleResult);
});
}
})(next);
});
}
function* main() { // ..
}
run(main);
thunk
传名调用
这就是 Thunk 函数的定义,它是“传名调用”的一种实现策略,用来替换某个表达式。
function f(a, b) {
// ...
return a * b;
}
let x = 5;
f(x + 5, 2);
// 传值调用的话,在进入函数体之前就计算了 x + 5;
// 等同于
f(10, 2)
// 传名调用的话,等同于
// 这个临时函数就叫做 Thunk 函数
// 用函数替换表达式,为了呀
var thunk = function () {
return x + 5;
};
function f(thunk, b) {
// ...
return thunk() * b;
}
const x = 5;
f(thunk, 2)
// 真正执行到 thunk()的时候才计算 x+5;
js中的 thunk
指一个用于调用另外一个函数的函数
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback);
// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
return function (callback) {
return fs.readFile(fileName, callback);
};
};
var readFileThunk = Thunk(fileName);
readFileThunk(callback);
Thunk 函数转换器
// ES5版本
var Thunk = function(fn){
return function (){
var args = Array.prototype.slice.call(arguments);
return function (callback){
args.push(callback);
return fn.apply(this, args);
}
};
};
// ES6版本
const Thunk = function(fn) {
return function (...args) {
return function (callback) {
return fn.call(this, ...args, callback);
}
};
};
使用上面的转换器,生成fs.readFile
的 Thunk 函数。
var readFileThunk = Thunk(fs.readFile);
readFileThunk(fileA)(callback);
// 生产环境写法
var thunkify = require('thunkify');
var fs = require('fs');
var read = thunkify(fs.readFile);
read('package.json')(function(err, str){
// ...
});
下面是另一个完整的例子。
function f(a, cb) {
cb(a);
}
const ft = Thunk(f);
ft(1)(console.log) // 1
//或者
ft(1)(function (data) {
console.log(data)
})
基于 thunk 的Generator 函数的自动流程管理
Generator 函数可以自动执行。同步的情况
function* gen() {
// ...
}
var g = gen();
var res = g.next();
while(!res.done){
console.log(res.value);
res = g.next();
}
异步的情况
var fs = require('fs');
var thunkify = require('thunkify');
var readFileThunk = thunkify(fs.readFile);
var gen = function* (){
var r1 = yield readFileThunk('/etc/fstab');
console.log(r1.toString());
var r2 = yield readFileThunk('/etc/shells');
console.log(r2.toString());
};
var g = gen();
var r1 = g.next();
r1.value(function (err, data) {
if (err) throw err;
var r2 = g.next(data);
r2.value(function (err, data) {
if (err) throw err;
g.next(data);
});
});
自动流程递归改写
function run(fn) {
var gen = fn();
function next(err, data) {
var result = gen.next(data);
if (result.done) return;
result.value(next);
}
next();
}
var g = function* (){
var f1 = yield readFileThunk('fileA');
var f2 = yield readFileThunk('fileB');
// ...
var fn = yield readFileThunk('fileN');
};
run(g);
co 的流程管理
Generator 就是一个异步操作的容器。它的自动执行需要一种机制,当异步操作有了结果,能够自动交回执行权。
两种方法可以做到这一点。
(1)回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权。
(2)Promise 对象。将异步操作包装成 Promise 对象,用then
方法交回执行权。
co 模块其实就是将两种自动执行器(Thunk 函数和 Promise 对象),包装成一个模块。使用 co 的前提条件是,Generator 函数的yield
命令后面,只能是 Thunk 函数或 Promise 对象。如果数组或对象的成员,全部都是 Promise 对象,也可以使用 co,详见后文的例子。
var gen = function* () {
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
// co 模块可以让你不用编写 Generator 函数的执行器。
var co = require('co');
co(gen).then(function (){
console.log('Generator 函数执行完成');
});
基于 promise 对象自动执行
var fs = require('fs');
var readFile = function (fileName){
return new Promise(function (resolve, reject){
fs.readFile(fileName, function(error, data){
if (error) return reject(error);
resolve(data);
});
});
};
var gen = function* (){
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
// 手动执行
var g = gen();
g.next().value.then(function(data){
g.next(data).value.then(function(data){
g.next(data);
});
});
function run(gen){
var g = gen();
function next(data){
var result = g.next(data);
if (result.done) return result.value;
result.value.then(function(data){
next(data);
});
}
next();
}
run(gen);
源码
co 就是上面那个自动执行器的扩展,它的源码只有几十行,非常简单。
首先,co 函数接受 Generator 函数作为参数,返回一个 Promise 对象。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {
});
}
在返回的 Promise 对象里面,co 先检查参数gen
是否为 Generator 函数。如果是,就执行该函数,得到一个内部指针对象;如果不是就返回,并将 Promise 对象的状态改为resolved
。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {
if (typeof gen === 'function') gen = gen.call(ctx);
if (!gen || typeof gen.next !== 'function') return resolve(gen);
});
}
接着,co 将 Generator 函数的内部指针对象的next
方法,包装成onFulfilled
函数。这主要是为了能够捕捉抛出的错误。
function co(gen) {
var ctx = this;
return new Promise(function(resolve, reject) {
if (typeof gen === 'function') gen = gen.call(ctx);
if (!gen || typeof gen.next !== 'function') return resolve(gen);
onFulfilled();
function onFulfilled(res) {
var ret;
try {
ret = gen.next(res);
} catch (e) {
return reject(e);
}
next(ret);
}
});
}
最后,就是关键的next
函数,它会反复调用自身。
function next(ret) {
if (ret.done) return resolve(ret.value);
var value = toPromise.call(ctx, ret.value);
if (value && isPromise(value)) return value.then(onFulfilled, onRejected);
return onRejected(
new TypeError(
'You may only yield a function, promise, generator, array, or object, '
+ 'but the following object was passed: "'
+ String(ret.value)
+ '"'
)
);
}
上面代码中,next
函数的内部代码,一共只有四行命令。
第一行,检查当前是否为 Generator 函数的最后一步,如果是就返回。
第二行,确保每一步的返回值,是 Promise 对象。
第三行,使用then
方法,为返回值加上回调函数,然后通过onFulfilled
函数再次调用next
函数。
第四行,在参数不符合要求的情况下(参数非 Thunk 函数和 Promise 对象),将 Promise 对象的状态改为rejected
,从而终止执行。
处理并发的异步操作
co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下一步。
这时,要把并发的操作都放在数组或对象里面,跟在yield
语句后面。
// 数组的写法
co(function* () {
var res = yield [
Promise.resolve(1),
Promise.resolve(2)
];
console.log(res);
}).catch(onerror);
// 对象的写法
co(function* () {
var res = yield {
1: Promise.resolve(1),
2: Promise.resolve(2),
};
console.log(res);
}).catch(onerror);
下面是另一个例子。
co(function* () {
var values = [n1, n2, n3];
yield values.map(somethingAsync);
});
function* somethingAsync(x) {
// do something async
return y
}
上面的代码允许并发三个somethingAsync
异步操作,等到它们全部完成,才会进行下一步。
Async/await
async
async 是一个通过异步执行并隐式返回 Promise 作为结果的函数。async
函数完全可以看作多个异步操作,包装成的一个 Promise 对象
async函数的状态变化
async
函数返回的 Promise 对象,必须等到内部所有await
命令后面的 Promise 对象执行完,才会发生状态改变,除非遇到return
语句或者抛出错误。也就是说,只有async
函数内部的异步操作执行完,才会执行then
方法指定的回调函数。
下面是一个例子。
async function getTitle(url) {
let response = await new Promise((resolve) => {
setTimeout(resolve, 100);
})
let html = await new Promise((resolve) => {
setTimeout(resolve, 200);
});
return 'suncess'
}
getTitle('https://tc39.github.io/ecma262/').then(console.log)
// "ECMAScript 2017 Language Specification"
上面代码中,函数getTitle
内部有三个操作:抓取网页、取出文本、匹配页面标题。只有这三个操作全部完成,才会执行then
方法里面的console.log
。
async 可以当做 await 的参数
async function timeout(ms) {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function asyncPrint(value, ms) {
await timeout(ms);
console.log(value);
}
asyncPrint('hello world', 50);
async 函数 return
async
函数内部return
语句返回的值,会成为then
方法回调函数的参数。
async function f() {
return 'hello world';
}
f().then(v => console.log(v))
// "hello world"
上面代码中,函数f
内部return
命令返回的值,会被then
方法回调函数接收到。
async 函数 throw
async
函数内部抛出错误,会导致返回的 Promise 对象变为reject
状态。抛出的错误对象会被catch
方法回调函数接收到。
async function f() {
throw new Error('出错了');
}
f().then(
v => console.log(v),
e => console.log(e)
)
// Error: 出错了
await
我觉得就是 yield如果await
命令后面是一个 Promise 对象,返回该对象的结果。
如果不是 Promise 对象,则默认创建一个 Promise 对象
await 后面的 promise 如果 reject
任何一个await
语句后面的 Promise 对象变为reject
状态,那么整个async
函数都会中断执行。
async function f() {
await Promise.reject('出错了');
await Promise.resolve('hello world'); // 不会执行
}
await 做了什么
async function foo() {
console.log(1)
let a = await 100
console.log(a)
console.log(2)
}
console.log(0)
foo()
console.log(3)
async/await 执行流程图
首先,执行console.log(0)
这个语句,打印出来 0。
紧接着就是执行 foo 函数,由于 foo 函数是被 async 标记过的,所以当进入该函数的时候,JavaScript 引擎会保存当前的调用栈等信息,然后执行 foo 函数中的console.log(1)
语句,并打印出 1。
接下来就执行到 foo 函数中的await 100
这个语句了,这里是我们分析的重点,因为在执行await 100
这个语句时,JavaScript 引擎在背后为我们默默做了太多的事情,那么下面我们就把这个语句拆开,来看看 JavaScript 到底都做了哪些事情。
当执行到await 100
时,
首先会默认创建一个 Promise 对象,然后执行executor 函数,调用了 resolve 函数,JavaScript 引擎会将该任务提交给微任务队列JavaScript 引擎会暂停当前协程的执行,将主线程的控制权转交给父协程执行,同时会将 promise_ 对象返回给父协程。
接下来继续执行父协程的流程,这里我们执行console.log(3)
,并打印出来 3。随后父协程将执行结束,在结束之前,会进入微任务的检查点,然后执行微任务队列,async 函数对这里做了封装,把promise.then 中的值传给 a,然后调用 next 方法。传resolve 的值
回到 foo 协程,继续执行
我修改的流程代码
function fetch (data) {
return new Promise((resolve, reject) => {
console.log('fetch start')
resolve(data)
})
}
async function foo() {
console.log(1)
let a = await fetch(100)
console.log(a)
let b = await fetch(a + 100)
console.log(b)
}
console.log(0)
foo()
console.log(3)
0
1
fetch start
3
100
fetch start
200
为什么引入 async/await
这种方式能够彻底告别执行器和生成器,实现更加直观简洁的代码,async/await 技术背后的秘密就是 Promise 和生成器应用,往低层说就是微任务和协程应用
使用 Async 和 await 改写生成器加 promise 的方式
async function foo(){
try{
let response1 = await fetch('https://www.geekbang.org')
console.log('response1')
console.log(response1)
let response2 = await fetch('https://www.geekbang.org/test')
console.log('response2')
console.log(response2)
}catch(err) {
console.error(err)
}
}
foo()
异步函数使用async 关键字标识,内部用 await 表示异步操作
async 函数是什么?一句话,它就是 Generator 函数的语法糖。
function foo(x, y) {
return request(
"http://some.url.1/?x=" + x + "&y=" + y);
}
// 这里也不需要声明为生成器函数
async function main() {
try {
// 把 yield替换成await
var text = await foo(11, 31);
console.log(text);
} catch (err) {
console.error(err);
}
}
// 这里不需要 run 函数
main();//
如果你 await 了一个 Promise,async 函数就会自动获知要做什么,它会暂停这个函数(就 像生成器一样),直到 Promise 决议。
const fs = require('fs');
const readFile = function (fileName) {
return new Promise(function (resolve, reject) {
fs.readFile(fileName, function(error, data) {
if (error) return reject(error);
resolve(data);
});
});
};
const gen = function* () {
const f1 = yield readFile('/etc/fstab');
const f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
上面代码的函数gen
可以写成async
函数,就是下面这样。
const asyncReadFile = async function () {
const f1 = await readFile('/etc/fstab');
const f2 = await readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
一比较就会发现,async
函数就是将 Generator 函数的星号(*
)替换成async
,将yield
替换成await
,仅此而已。
async
函数对 Generator 函数的改进,体现在以下四点。
(1)内置执行器。
Generator 函数的执行必须靠执行器,所以才有了co
模块,而async
函数自带执行器。也就是说,async
函数的执行,与普通函数一模一样,只要一行。
asyncReadFile();
上面的代码调用了asyncReadFile
函数,然后它就会自动执行,输出最后结果。这完全不像 Generator 函数,需要调用next
方法,或者用co
模块,才能真正执行,得到最后结果。
(2)更好的语义。async
和await
,比起星号和yield
,语义更清楚了。async
表示函数里有异步操作,await
表示紧跟在后面的表达式需要等待结果。
(3)更广的适用性。co
模块约定,yield
命令后面只能是 Thunk 函数或 Promise 对象,而async
函数的await
命令后面,可以是 Promise 对象和原始类型的值(数值、字符串和布尔值,但这时会自动转成立即 resolved 的 Promise 对象)。
(4)返回值是 Promise。async
函数的返回值是 Promise 对象,这比 Generator 函数的返回值是 Iterator 对象方便多了。你可以用then
方法指定下一步的操作。
进一步说,async
函数完全可以看作多个异步操作,包装成的一个 Promise 对象,而await
命令就是内部then
命令的语法糖。
使用
function timeout(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function asyncPrint(value, ms) {
await timeout(ms);
console.log(value);
}
asyncPrint('hello world', 50);
上面代码指定 50 毫秒以后,输出hello world
。
由于async
函数返回的是 Promise 对象,可以作为await
命令的参数。所以,上面的例子也可以写成下面的形式。
async function timeout(ms) {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function asyncPrint(value, ms) {
await timeout(ms);
console.log(value);
}
asyncPrint('hello world', 50);
错误处理
async 函数的实现原理
async 函数的实现原理,就是将 Generator 函数和自动执行器,包装在一个函数里。
async function fn(args) {
// ...
}
// 等同于
function fn(args) {
return spawn(function* () {
// ...
});
}
所有的async
函数都可以写成上面的第二种形式,其中的spawn
函数就是自动执行器。
下面给出spawn
函数的实现,基本就是前文自动执行器的翻版。
function spawn(genF) {
return new Promise(function(resolve, reject) {
const gen = genF();
function step(nextF) {
let next;
try {
next = nextF();
} catch(e) {
return reject(e);
}
if(next.done) {
return resolve(next.value);
}
Promise.resolve(next.value).then(function(v) {
step(function() { return gen.next(v); });
}, function(e) {
step(function() { return gen.throw(e); });
});
}
step(function() { return gen.next(undefined); });
});
}
习题
async function foo() {
console.log('foo')
}
async function bar() {
console.log('bar start')
await foo()
console.log('bar end')
}
console.log('script start')
setTimeout(function () {
console.log('setTimeout')
}, 0)
bar();
new Promise(function (resolve) {
console.log('promise executor')
resolve();
}).then(function () {
console.log('promise then')
})
console.log('script end')
script start
bar start
promise executor
script end
foo // 错误!foo 在 bar start 后面
bar end
promise then
setTimeout
实例:
继发操作
async function logInOrder(urls) {
// 并发读取远程URL
const textPromises = urls.map(async url => {
const response = await fetch(url);
return response.text();
});
// 按次序输出
for (const textPromise of textPromises) {
console.log(await textPromise);
}
}
问题是所有远程操作都是继发。只有前一个 URL 返回结果,才会去读取下一个 URL,这样做效率很差,非常浪费时间。我们需要的是并发发出远程请求。
按顺序完成并发异步操作
实际开发中,经常遇到一组异步操作,需要按照顺序完成。比如,依次远程读取一组 URL,然后按照读取的顺序输出结果。
Promise 的写法如下。
function logInOrder(urls) {
// 远程读取所有URL
const textPromises = urls.map(url => {
return fetch(url).then(response => response.text());
});
// 按次序输出
textPromises.reduce((chain, textPromise) => {
return chain.then(() => textPromise)
.then(text => console.log(text));
}, Promise.resolve());
}
上面代码使用fetch
方法,同时远程读取一组 URL。每个fetch
操作都返回一个 Promise 对象,放入textPromises
数组。然后,reduce
方法依次处理每个 Promise 对象,然后使用then
,将所有 Promise 对象连起来,因此就可以依次输出结果。
这种写法不太直观,可读性比较差。下面是 async 函数实现。
async function logInOrder(urls) {
// 并发读取远程URL
const textPromises = urls.map(async url => {
const response = await fetch(url);
return response.text();
});
// 按次序输出
for (const textPromise of textPromises) {
console.log(await textPromise);
}
}
上面代码中,虽然map
方法的参数是async
函数,但它是并发执行的,因为只有async
函数内部是继发执行,外部不受影响。后面的for..of
循环内部使用了await
,因此实现了按顺序输出。
处理 Stream
Node 提供 Stream 模式读写数据,特点是一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。这对于处理大规模数据非常有利。Stream 模式使用 EventEmitter API,会释放三个事件。
data
事件:下一块数据块已经准备好了。end
事件:整个“数据流”处理完了。error
事件:发生错误。
使用Promise.race()
函数,可以判断这三个事件之中哪一个最先发生,只有当data
事件最先发生时,才进入下一个数据块的处理。从而,我们可以通过一个while
循环,完成所有数据的读取。
const co = require('co');
const fs = require('fs');
const stream = fs.createReadStream('./les_miserables.txt');
let valjeanCount = 0;
co(function*() {
while(true) {
const res = yield Promise.race([
new Promise(resolve => stream.once('data', resolve)),
new Promise(resolve => stream.once('end', resolve)),
new Promise((resolve, reject) => stream.once('error', reject))
]);
if (!res) {
break;
}
stream.removeAllListeners('data');
stream.removeAllListeners('end');
stream.removeAllListeners('error');
valjeanCount += (res.toString().match(/valjean/ig) || []).length;
}
console.log('count:', valjeanCount); // count: 1120
});
上面代码采用 Stream 模式读取《悲惨世界》的文本文件,对于每个数据块都使用stream.once
方法,在data
、end
、error
三个事件上添加一次性回调函数。变量res
只有在data
事件发生时才有值,然后累加每个数据块之中valjean
这个词出现的次数。