https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/
背景:
以流的方式响应数据最核心的实现就是使用 pipe 方法来实现的输入、输出
参数 readable.pipe(destination[, options])
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
readerStream.pipe(writerStream);
例如压缩(边读边写)
var fs = require("fs");var zlib = require('zlib');// 压缩 input.txt 文件为 input.txt.gzfs.createReadStream('input.txt').pipe(zlib.createGzip()).pipe(fs.createWriteStream('input.txt.gz'));
支持链式调用
v8.x之后独立函数 stream.pipeline(streams, callback)
https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback
const { pipeline } = require('stream');const fs = require('fs');const zlib = require('zlib');pipeline(fs.createReadStream('archive.tar'),zlib.createGzip(),fs.createWriteStream('archive.tar.gz'),(err) => { });
原理
https://cloud.tencent.com/developer/article/1630068
- 可读流监听
data事件- 首先判断
dest.Writable,当写完时会赋值为false - 此时如果消费者消费速度慢,这时产生了一个现象,叫做
背压。背压问题即是外部的生产者和消费者速度差造成的,此时我们需要暂停写入.pause()
- 首先判断
- 可写流监听
drain事件- 消费者完成消费,可以触发
drain事件,此时我们可以继续向流中写入数据。执行.resume()
- 消费者完成消费,可以触发
- 触发
pipe事件,通知有流写入 - 返回
dest流,可以进行链式调用
简单实现(手写)
https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/readable.js#L645
stream.prototype.pipe = function(dest, options) {this.on('data', (chunk) => {if (dest.writable) {if (false === dest.write(chunk) && this.pause) {this.pause();}}});dest.on('drain', () => {this.resume();});dest.emit('pipe', this);return dest;};
