https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/
背景:
以流的方式响应数据最核心的实现就是使用 pipe 方法来实现的输入、输出
参数 readable.pipe(destination[, options])
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
readerStream.pipe(writerStream);
例如压缩(边读边写)
var fs = require("fs");
var zlib = require('zlib');
// 压缩 input.txt 文件为 input.txt.gz
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
支持链式调用
v8.x之后独立函数 stream.pipeline(streams, callback)
https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => { }
);
原理
https://cloud.tencent.com/developer/article/1630068
- 可读流监听
data
事件- 首先判断
dest.Writable
,当写完时会赋值为false - 此时如果消费者消费速度慢,这时产生了一个现象,叫做
背压
。背压问题即是外部的生产者和消费者速度差造成的,此时我们需要暂停写入.pause()
- 首先判断
- 可写流监听
drain
事件- 消费者完成消费,可以触发
drain
事件,此时我们可以继续向流中写入数据。执行.resume()
- 消费者完成消费,可以触发
- 触发
pipe
事件,通知有流写入 - 返回
dest
流,可以进行链式调用
简单实现(手写)
https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/readable.js#L645
stream.prototype.pipe = function(dest, options) {
this.on('data', (chunk) => {
if (dest.writable) {
if (false === dest.write(chunk) && this.pause) {
this.pause();
}
}
});
dest.on('drain', () => {
this.resume();
});
dest.emit('pipe', this);
return dest;
};