readableStream的使用及实现:
const readableStream = require('./readableStream');//继承自EventEmitterreadableStream.on('data', (data) => {console.log(data.toString());});
./readableStream:
const Readable = require('./Readable');
//水井
let cell = ['1', '2', '3', '4', '5'];
let idx = 0;
const readableStream = new Readable({
read() {
if (idx >= cell.length) {
this.push(null);
} else {
this.push(cell[idx]);
}
idx++;
}
});
module.exports = readableStream;
./Readable:
const Stream = require('stream');
const { inherits } = require('util');
function Readable(options = {}) {
Stream.call(this, options);
this._readableState = {
ended: false,//水井是否抽取结束,水井是否已经干涸
buffer: [],//水箱 水泵把水从井时抽出来放到水箱里
flowing: false//开关是否打开,如果打开会持续抽水,并且发送给用户
};
//把传递过来的read方法存放到_read上,用来向数据源读取数据
if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
//ES6 Class最终其实就是构建函数
//EventEmitter on
Readable.prototype.on = function (event, fn) {
Stream.prototype.on.call(this, event, fn);
if (event === 'data') {
this.resume();//恢复 读取数据,其实就是把开关打开了,让水可以流动下来了
}
}
Readable.prototype.resume = function () {
this._readableState.flowing = true;//开关已经打开了,水可以流下来了
while (this.read());
}
Readable.prototype.read = function () {
//如果开关打开,并且井里有水才会抽水
if (this._readableState.flowing && !this._readableState.ended) {
this._read();
}
let data = this._readableState.buffer.shift();
if (data) {
this.emit('data', data);
}
return data;
}
Readable.prototype.push = function (chunk) {
//如果本次抽水,抽来的是个空,没抽上来,说明已经没水了,
if (chunk === null) {
this._readableState.ended = true;//结束
} else {
this._readableState.buffer.push(chunk);
}
}
Readable.prototype.pipe = function (dest) {
this.on('data', function (data) {
let lessThanMark = dest.write(data);
if (!lessThanMark) {
this.pause();
}
});
this.on('drain', function () {
this.resume();
});
return dest;
};
module.exports = Readable;
writableStream的使用及实现:
let writableStream = require('./writableStream');
writableStream.write('1');
writableStream.write('2');
writableStream.write('3');
writableStream.write('4');
writableStream.write('5');
writableStream.end();
./writableStream:
const Writable = require('./Writable');
const writableStream = new Writable({
write(data, encoding, next) {
//这里模拟的就是写入硬盘的过程,或者说真正吃馒头的过程
console.log(data.toString());
setTimeout(next, 1000);
}
});
module.exports = writableStream;
./Writable:
const Stream = require('./Stream');
const { inherits } = require('util');
function Writable(options) {
Stream.call(this, options);
this._writableState = {
ended: false,//是否已经写完了 是否已经把所有的馒头吃完了,吃撑了,吃不下了
writing: false,//是否正在写 是否嘴里正在吃馒头
buffer: [],// 缓存区,用来存放将要写入的数据,也就是放馒头的桌子
bufferSize: 0 //正在处理中的数据字节数等正在吃的和桌子上放的,也就是正在向底层系统写入的+缓存区里的
};
if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
if (this._writableState.ended) {
//wite header after end 可写流已经关闭了,就不能往里写东西了
return false;
}
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
//不管你的数据是放在缓存区里,还是直接写入底层文件系统,都会累加到bufferSize
//当一个数据被真正写入硬盘后,或者说处理完了,才会减少
this._writableState.bufferSize += chunk.length;
let lessThanMark = this.options.highWaterMark > this._writableState.bufferSize;
//如果当前正在写入2
if (this._writableState.writing) {
this._writableState.buffer.push(chunk);
} else {//如果当前没有正在写入1
//_write 是真正的写入方法,比如说写入硬盘,吃馒头
this._writableState.writing = true;
this._write(chunk, 'utf8', () => {
this._writableState.bufferSize -= chunk.length;
this.next();
});
}
return lessThanMark;
}
Writable.prototype.next = function () {
this._writableState.writing = false;
if (this._writableState.buffer.length > 0) {
let chunk = this._writableState.buffer.shift();
this._write(chunk, 'utf8', () => {
this._writableState.bufferSize -= chunk.length;
this.next();
});
} else {
this.emit('drain');
}
}
Writable.prototype.end = function () {
this._writableState.ended = true;
}
module.exports = Writable;
上面代码中,还实现了水位线(highWaterMark)的功能,用来确保生产和消费之间的平衡
实例:
//const { Writable } = require('stream');
const Writable = require('./Writable');
const ws = new Writable({
//如果highWaterMark=1说明正在处理的数据只能有1个字节,正在处理的数据包括已经 发给底层系统的数据加上缓存区的数据
highWaterMark: 3,//最高水位线。其实就是指的能缓存的数据大小
write(data, encoding, next) {
//模拟的是真实写入的过程
console.log('writing ', data.toString());
setTimeout(next, 1000);
}
});
//如果缓存区满了,那就返回false,如果没有满,那就返回true
//正在处理的数据如果大于等于最高水位线,就是false,
//返回当前正在处理的数据(向硬盘写入的+缓存的)=当前的水位小于最高水位线
let lessThanMark = ws.write('1');
console.log("lessThanMark:1", lessThanMark);
lessThanMark = ws.write('2');
console.log("lessThanMark:2", lessThanMark);
lessThanMark = ws.write('3');
console.log("lessThanMark:3", lessThanMark);
//如果lessThanMark如果为false,按理说就不要再写了
//监听 可写流的排干事件
ws.once('drain', () => {
let lessThanMark = ws.write('4');
console.log("lessThanMark:4", lessThanMark);
lessThanMark = ws.write('5');
console.log("lessThanMark:5", lessThanMark);
lessThanMark = ws.write('6');
console.log("lessThanMark:6", lessThanMark);
});
管道
管道就是从文件中读取数据出来,然后在写到另一个地方,下面的pipe方法中dest参数就是要写到的目的地
Readable.prototype.pipe = function (dest) {
this.on('data', function (data) {
let lessThanMark = dest.write(data);
if (!lessThanMark) {
this.pause();
}
});
this.on('drain', function () {
this.resume();
});
return dest;
};
双工流:
双工流,既可以读,也可以写,读和写之间没有关系
const Readable = require('./Readable');
const Writable = require('./Writable');
const { inherits } = require('util');
inherits(Duplex, Readable);
function Duplex(options) {
Readable.call(this, options);
Writable.call(this, options);
}
const keys = Object.keys(Writable.prototype);
for (let v = 0; v < keys.length; v++) {
const method = keys[v];
Duplex.prototype[method] = Writable.prototype[method];
}
module.exports = Duplex;
注:由于Duplex需要同时继承Readable和Writable,而inherits只能继承一个类上的成员,我们通过inherits(Duplex, Readable);继承了Readable,然后在最后通过for循环手动将Writable上的方法拷贝到Duplex上去
转换流:
从Readable读出来的数据,经过转化流转换之后,再写到Writable里面
const readableStream = require('./readableStream');
const transformStream = require('./transformStream');
const writableStream = require('./writableStream');
readableStream.pipe(transformStream).pipe(writableStream);
