目的在于提升性能。
Node利用JavaScript及其内部异步库,将异步直接提升到业务层面,这是一种创新。

函数式编程

函数是编程是作为异步编程的基础。

高阶函数

普通函数只能接受基本数据类型作为输入或输出的函数,而高阶函数是可以接受另一个函数作为输入或输出的函数。

  1. // 普通函数
  2. function foo (x) {
  3. return x
  4. }
  5. // 高阶函数
  6. function foo(x) {
  7. return function(x) {
  8. return x
  9. }
  10. }
  11. function foo(x, bar) {
  12. return bar(x)
  13. }

例如:数组排序sort

  1. const arr = [1,3,5,6,2,0,8];
  2. arr.sort(function(a, d) {
  3. return a - b
  4. })
  5. // 这里的sort即是一个高阶函数

高阶函数在JavaScript中比比皆是,其中ECMAScript5中提供的一些数组方法(forEach()、map()、reduce()、reduceRight()、filter()、every()、some())十分典型。

偏函数的用法

偏函数的用法是指创建一个调用另外一个部分—参数或变量已经预置的函数—的函数的用法。

例如:

  1. const isType = function (type) {
  2. return function (obj) {
  3. return toString.call(obj) === `[object ${type}]`
  4. }
  5. }
  6. const isString = isType('String');
  7. const isFunc = ifType('Function');
  8. // 这种通过指定部分参数来产生一个新的定制函数的形式就是偏函数

偏函数在异步编程中是十分常见的。

优势与难点

优势

Node最大的优势莫过于是基于事件驱动,非阻塞I/O模型。非阻塞I/O可以使cpu与I/O不相互依赖等待执行,让资源能够得到更好的利用。

难点

  1. 异常处理 ```javascript const async = function (callback) { process.nextTick(callback) }

try { async(callbakc) }catch(e) { // TODO } //调用async()方法后,callback被存放起来,直到下一个事件循环(Tick)才会取出来执行, 此时try…catch对于异常处理也无能为力。

  1. node在异常处理上形成一种约定,将异常作为回调函数的第一个参数返回,若为空值则表示没有异常抛出。
  2. ```javascript
  3. async(function (err, results) {
  4. //TODO
  5. })
  6. /* 我们自行编写的异步方法上应遵循这种一些原则
  7. 1. 必须执行调用者传入的回调函数
  8. 2. 正确传递回异常供调用者判断
  9. */
  10. // 例子
  11. const async = function(callback){
  12. process.nextTick(function () {
  13. const results = something;
  14. if (error) {
  15. return callback(error)
  16. }
  17. return callback(null, results)
  18. })
  19. }

在编写异步方法时,只要将异常正确地传递给用户的回调方法即可,无须过多处理。

  1. 函数嵌套过深
  2. 阻塞代码

javascript并没有sleep()这样的线程沉睡的功能,仅仅只有setTimeout,setInterval这两个延迟函数,但是这两个函数并不会阻塞后面代码的运行。
遇见这样的需求时,在统一规划业务逻辑之后,调用setTimeout()的效果会更好。

  1. 多线程编程

Web Workers是一个利用消息机制合理使用多核CPU的理想模型。

  1. 异步转同步

异步编程解决方案

异步编程的解决方案主要有三种

  • 事件发布/订阅模式
  • Promise/Deferred模式
  • 流程控制库

事件发布/订阅模式

事件监听模式是一直广泛的异步编程的模式,是回调函数的事件化,又称为发布/订阅模式。
NodeJs自身提供的events模块是这种模式的简单实现。它具有addListener/on()、once()、removeListener()、removeAllListeners()和emit()等基本的事件监听模式的方法实现。

  1. const events = require('events')
  2. const emmiter = new events.Emitter()
  3. // 订阅
  4. emmiter.on('event1',(msg) => {
  5. console.log('触发事件', msg)
  6. })
  7. // 发布
  8. emmiter.emit('event1')

订阅/发布自身并无同步异步调用的问题,但在Node中,emit()调用多半是伴随事件循环而异步触发的,所以我们说事件发布/订阅广泛应用于异步编程。

订阅/发布模式通常用来解耦业务逻辑,发布者无需关注订阅的侦听器如何实现逻辑,更无需关注有多少个订阅侦听器,数据通过消息的方式可以很灵活地传递。

例如:

  1. const http = reuqire('http')
  2. const option = {
  3. host: 'www.google.com',
  4. port: 80,
  5. path: '/update',
  6. method: 'post'
  7. }
  8. const req = http.request(option, function(res) {
  9. console.log("statusCode", res.statusCode);
  10. res.setEncoding('utf-8')
  11. res.on('data', function(chunk) {
  12. console.log('data chunk', chunk)
  13. })
  14. res.on('end', function() {
  15. console.log('end')
  16. })
  17. })
  18. req.on('error', (e) => {
  19. console.log('error', e)
  20. })
  21. req.write('data')
  22. req.end()

在这段HTTP请求的代码中,程序员只需要将视线放在error、data、end这些业务事件点上即可,至于内部的流程如何,无需过于关注

多类型异步协作

可以使用到eventproxy。

  1. npm i --save eventproxy // 安装
  2. // 使用
  3. const EventProxy = require('eventproxy')
  4. const proxy = new EventProxy()
  5. proxy.all('tpl', 'data', function (tpl, data) {
  6. // TODO
  7. // 在所有指定事件触发后将会被执行
  8. // 参数对应各自的时间名
  9. })
  10. fs.readFile(file_path, 'utf-8', function(err, template) {
  11. proxy.emit('tpl', template)
  12. })
  13. db.query(sql, function(err, data) {
  14. proxy.emit('data', data)
  15. })

EventProxy
all():订阅多个事件,当所有的事件都被触发后,侦听器才会被执行。
tail(): 订阅多个事件,当侦听器满足条件时执行一次之后,如果组合事件中的某个事件被再次触发,侦听器会用最新的数据继续执行。

异步场景下,我们常常需要用一个接口多次获取数据,此时触发的事件名或许是相同的。
after(): 实现侦听器在执行多少次之后执行侦听器的单一事件组合订阅方式。
例如

  1. const proxy = new EventProxy();
  2. proxy.after(data, 10, function(data) {
  3. // 表示执行10次data事件之后执行
  4. })

EventProxy模块除了可以应用于Node中外,还可以用在前端浏览器中。

EventProxy异常处理
**
fail()

  1. const proxy = new EventProxy();
  2. proxy.fail(callback)
  3. // 等价于
  4. proxy.fail(function(err) {
  5. callback(err)
  6. })
  7. proxy.bind('error', function(err) {
  8. proxy.unbind() // 卸载所有处理函数
  9. callback(err)
  10. })

done()

  1. const proxy = new EventProxy();
  2. proxy.done('data')
  3. // 等价于
  4. function (err, content) {
  5. if (err) return proxy.emit('error', err);
  6. proxy.emit('data', content)
  7. }
  8. // done 可以接受函数作为一个参数
  9. proxy.done(function(content) {
  10. // TODO
  11. // 这里无需考虑异常
  12. proxy.emit('data', content)
  13. })
  14. // 等价于
  15. function(err, content) {
  16. if (err) return proxy.emit('error', err);
  17. (function(content) {
  18. proxy.emit('data', content)
  19. }(content))
  20. }

当只传入一个回调函数时,需要手工调用emit()触发事件

Promise/Deferred模式

使用事件的方式时,执行流程需要被预先设定。即便是分支,也是需要预先设定,这是由于发布/订阅的运行机制决定的。
而Promise/Deferred模式是一种先执行异步调用,延迟传递处理的方式。

  1. // 普通ajax调用
  2. $.get('/api', {
  3. success: onSuccess,
  4. error: onError,
  5. complete: onComplete
  6. })
  7. // 使用了Promise/Deferred模式的ajax
  8. $.get('api')
  9. .success(onSuccess)
  10. .error(onError)
  11. .complete(onComplete)
  12. // 这使得即使不调用success, error等方法,ajax依然会被执行。

在原始的API中,一个事件只能处理一个回调,而通过Deferred对象,可以对事件加入任意的业务处理逻辑。

  1. $.get('api')
  2. .success(onSuccess_1)
  3. .success(onSuccess_2)

Promise/Deferred模式在2009年时被Kris Zyp抽象为一个提议草案,发布在CommonJS规范中。随着使用Promise/Deferred模式的应用逐渐增多,CommonJS草案目前已经抽象出了Promises/A、Promises/B、Promises/D这样典型的异步Promise/Deferred模型,这使得异步操作可以以一种优雅的方式出现

PromiseA

Promise/Deferred模式其实包含两部分,即Promise和Deferred。

Promise
PromiseA对异步操作做出了这样的抽象定义

  • Promise操作只会处在3种状态的一种:未完成态、完成态、失败态。
  • Promise的状态只会由未完成态->完成态或未完成态->失败态。不能逆反,完成态和失败态不能相互转换。
  • Promise状态一旦改变就不能再被更改。

Promise状态转换示意图.png

Api定义上,Promises/A提议是比较简单的。一个Promise只需要具备一个then()方法即可。

  • 接受完成态、错误态的回调方法。在操作完成或出现错误时,将会调用对应方法。
  • 可选地支持progress事件回调作为第三个方法。
  • then发放只接受function对象,其余对象将被忽略。
  • then方法返回Promise对象,可以继续链式调用。

.then(fulfillHandler, errorHandler, progressHandler)
_
Deferred
then()方法所做的事情是将回调函数存放起来。为了完成整个流程,还需要触发执行这些回调函数的地方,实现这些功能的对象通常被称为Deferred,即延迟对象。

Promise/Deferred模型简单实现

  1. // Promise
  2. const Promise = function() {
  3. EventEmitter.call(this)
  4. }
  5. util.inherits(Promise, EventEmitter)
  6. Promise.prototype.then = function(fulfillHandler, errorHandler, progressHandler) {
  7. if (type fulfillHandler === 'function') {
  8. // 利用once,保证函数只执行一次
  9. this.once('success', fulfillHandler)
  10. }
  11. if (type errorHandler === 'function') {
  12. // 利用once,保证函数只执行一次
  13. this.once('error', errorHandler)
  14. }
  15. if (type progressHandler === 'function') {
  16. // 利用once,保证函数只执行一次
  17. this.on('progress', progressHandler)
  18. }
  19. return this
  20. }
  21. // Deferred
  22. const Deferred = function () {
  23. this.state = 'unfulfilled';
  24. this.promise = new Promise();
  25. }
  26. Deferred.prototype.resolve = function(obj) {
  27. this.state = 'unfulfilled';
  28. this.promise.emit('success', obj)
  29. }
  30. Deferred.prototype.reject = function(err) {
  31. this.state = 'failed';
  32. this.promise.emit('error', err)
  33. }
  34. Deferred.prototype.progress = function(data) {
  35. this.promise.progress('progress', data)
  36. }

在实际的业务中若想使用Promise/Deferred模型可以不用封装,可以使用Q或when模块来实现。

  1. // q安装
  2. npm i --save q
  3. //q模块将数据封装成promise
  4. /***************************************************************************************/
  5. // 利用q实现一个readFile
  6. const q = require('q');
  7. const readFile = function(path, encoding) {
  8. const deferred = q.defer();
  9. fs.readFile(path, encoding, deferred.makeNodeResolves());
  10. return deferred.promise
  11. }
  12. // 调用readFile
  13. readFile('./index.txt', 'utf-8').then((data) => {
  14. // TODO
  15. }, (err) => {
  16. console.log(err)
  17. })
  18. //Promise通过封装异步调用,实现了正向用例和反向用例的分离以及逻辑处理延迟,这使得回调函数相对优雅

Promise中的多异步协作

  1. // 简单实现all方式
  2. Deferred.prototype.all = function(promises) {
  3. let count = promises.lenth;
  4. const that = this;
  5. const results = [];
  6. promises.forEach((promise, i) => {
  7. promise.then((data) => {
  8. count--;
  9. results[i] = data;
  10. if (count === 0) {
  11. that.resolve(results)
  12. }
  13. },(err) => {
  14. that.reject(err)
  15. })
  16. })
  17. return this.promise
  18. }
  19. // 调用
  20. const promise_1 = readFile('a.txt', 'utf-8');
  21. const promise_2 = readFile('b.txt', 'utf-8');
  22. const deferred = new Deferred();
  23. deferred.all([promise_1, promise_2], (data) => {
  24. // TODO
  25. }, (err) => {
  26. // TODO error
  27. })

流程控制库

尾触发与next

尾触发目前应用最多的地方是Connect的中间件。
在Connect中,尾触发十分适合处理网络请求的场景。将复杂的处理逻辑拆解为简洁、单一的处理单元,逐层次地处理请求对象和响应对象。

  1. const connect = require('connect');
  2. const http = require('http');
  3. const app = connect();
  4. // gzip/deflate outgoing responses
  5. const compression = require('compression');
  6. app.use(compression());
  7. // store session state in browser cookie
  8. const cookieSession = require('cookie-session');
  9. app.use(cookieSession({
  10. keys: ['secret1', 'secret2']
  11. }));
  12. // parse urlencoded request bodies into req.body
  13. const bodyParser = require('body-parser');
  14. app.use(bodyParser.urlencoded({extended: false}));
  15. // respond to all requests
  16. app.use(function(req, res){
  17. res.end('Hello from Connect!\n');
  18. });
  19. //create node.js http server and listen on port
  20. http.createServer(app).listen(3000);

async

async模块提供了20多个方法用于处理异步的各种协作模式。
https://caolan.github.io/async/v3/

  1. // 安装
  2. yarn add async
  3. // 例子
  4. async.map(['file1','file2','file3'], fs.stat, function(err, results) {
  5. // results is now an array of stats for each file
  6. });
  7. async.filter(['file1','file2','file3'], function(filePath, callback) {
  8. fs.access(filePath, function(err) {
  9. callback(null, !err)
  10. });
  11. }, function(err, results) {
  12. // results now equals an array of the existing files
  13. });
  14. // 异步的并行执行
  15. async.parallel([
  16. function(callback) { ... },
  17. function(callback) { ... }
  18. ], function(err, results) {
  19. // optional callback
  20. });
  21. // 异步的串行执行
  22. async.series([
  23. function(callback) { ... },
  24. function(callback) { ... }
  25. ]);

setup

https://www.npmjs.com/package/setup

  1. // 安装
  2. yarn add setup

异步并发控制

bagpipe

https://github.com/JacksonTian/bagpipe/blob/master/README_CN.md

async

async也提供了一个方法用于处理异步调用的限制:parallelLimit()。
parallelLimit()方法的缺陷在于无法动态地增加并行任务,为此,async提供了queue()方法来满足该需求。

  1. async.parallelLimit([
  2. function(callback) {
  3. fs.readFile('a.txt', 'utf-8', callback)
  4. },
  5. function(callback) {
  6. fs.readFile('b.txt', 'utf-8', callback)
  7. }
  8. ], 1, function (err, results) {
  9. // TODO
  10. })
  11. // queue
  12. const q = async.queue(function(file, callback) {
  13. fs.readFile('a.txt', 'utf-8', callback)
  14. }, 2)
  15. q.drain = function() {
  16. // 完成了队列中的所有任务之后执行
  17. }
  18. fs.readdirSync('.').forEach(function(file) {
  19. q.push(file, function(err, data) {
  20. // TODO
  21. })
  22. })