readableStream的使用及实现:

  1. const readableStream = require('./readableStream');
  2. //继承自EventEmitter
  3. readableStream.on('data', (data) => {
  4. console.log(data.toString());
  5. });

./readableStream:

const Readable = require('./Readable');
//水井
let cell = ['1', '2', '3', '4', '5'];
let idx = 0;
const readableStream = new Readable({
    read() {
        if (idx >= cell.length) {
            this.push(null);
        } else {
            this.push(cell[idx]);
        }
        idx++;
    }
});
module.exports = readableStream;

./Readable:

const Stream = require('stream');
const { inherits } = require('util');
function Readable(options = {}) {
    Stream.call(this, options);
    this._readableState = {
        ended: false,//水井是否抽取结束,水井是否已经干涸
        buffer: [],//水箱 水泵把水从井时抽出来放到水箱里
        flowing: false//开关是否打开,如果打开会持续抽水,并且发送给用户
    };
    //把传递过来的read方法存放到_read上,用来向数据源读取数据
    if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
//ES6 Class最终其实就是构建函数
//EventEmitter on
Readable.prototype.on = function (event, fn) {
    Stream.prototype.on.call(this, event, fn);
    if (event === 'data') {
        this.resume();//恢复 读取数据,其实就是把开关打开了,让水可以流动下来了
    }
}
Readable.prototype.resume = function () {
    this._readableState.flowing = true;//开关已经打开了,水可以流下来了
    while (this.read());
}
Readable.prototype.read = function () {
    //如果开关打开,并且井里有水才会抽水
    if (this._readableState.flowing && !this._readableState.ended) {
        this._read();
    }
    let data = this._readableState.buffer.shift();
    if (data) {
        this.emit('data', data);
    }
    return data;
}
Readable.prototype.push = function (chunk) {
    //如果本次抽水,抽来的是个空,没抽上来,说明已经没水了,
    if (chunk === null) {
        this._readableState.ended = true;//结束
    } else {
        this._readableState.buffer.push(chunk);
    }
}
Readable.prototype.pipe = function (dest) {
    this.on('data', function (data) {
        let lessThanMark = dest.write(data);
        if (!lessThanMark) {
            this.pause();
        }
    });
    this.on('drain', function () {
        this.resume();
    });
    return dest;
};
module.exports = Readable;

writableStream的使用及实现:

let writableStream = require('./writableStream');

writableStream.write('1');
writableStream.write('2');
writableStream.write('3');
writableStream.write('4');
writableStream.write('5');
writableStream.end();

./writableStream:

const Writable = require('./Writable');
const writableStream = new Writable({
    write(data, encoding, next) {
        //这里模拟的就是写入硬盘的过程,或者说真正吃馒头的过程
        console.log(data.toString());
        setTimeout(next, 1000);
    }
});
module.exports = writableStream;

./Writable:


const Stream = require('./Stream');
const { inherits } = require('util');
function Writable(options) {
    Stream.call(this, options);
    this._writableState = {
        ended: false,//是否已经写完了 是否已经把所有的馒头吃完了,吃撑了,吃不下了
        writing: false,//是否正在写 是否嘴里正在吃馒头
        buffer: [],// 缓存区,用来存放将要写入的数据,也就是放馒头的桌子
        bufferSize: 0 //正在处理中的数据字节数等正在吃的和桌子上放的,也就是正在向底层系统写入的+缓存区里的
    };
    if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
    if (this._writableState.ended) {
        //wite header after end 可写流已经关闭了,就不能往里写东西了
        return false;
    }
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
    //不管你的数据是放在缓存区里,还是直接写入底层文件系统,都会累加到bufferSize
    //当一个数据被真正写入硬盘后,或者说处理完了,才会减少
    this._writableState.bufferSize += chunk.length;
    let lessThanMark = this.options.highWaterMark > this._writableState.bufferSize;
    //如果当前正在写入2
    if (this._writableState.writing) {
        this._writableState.buffer.push(chunk);
    } else {//如果当前没有正在写入1
        //_write 是真正的写入方法,比如说写入硬盘,吃馒头
        this._writableState.writing = true;
        this._write(chunk, 'utf8', () => {
            this._writableState.bufferSize -= chunk.length;
            this.next();
        });
    }
    return lessThanMark;
}
Writable.prototype.next = function () {
    this._writableState.writing = false;
    if (this._writableState.buffer.length > 0) {
        let chunk = this._writableState.buffer.shift();
        this._write(chunk, 'utf8', () => {
            this._writableState.bufferSize -= chunk.length;
            this.next();
        });
    } else {
        this.emit('drain');
    }
}
Writable.prototype.end = function () {
    this._writableState.ended = true;
}
module.exports = Writable;

上面代码中,还实现了水位线(highWaterMark)的功能,用来确保生产和消费之间的平衡
实例:

//const { Writable } = require('stream');
const Writable = require('./Writable');
const ws = new Writable({
    //如果highWaterMark=1说明正在处理的数据只能有1个字节,正在处理的数据包括已经 发给底层系统的数据加上缓存区的数据
    highWaterMark: 3,//最高水位线。其实就是指的能缓存的数据大小
    write(data, encoding, next) {
        //模拟的是真实写入的过程
        console.log('writing ', data.toString());
        setTimeout(next, 1000);
    }
});
//如果缓存区满了,那就返回false,如果没有满,那就返回true
//正在处理的数据如果大于等于最高水位线,就是false,
//返回当前正在处理的数据(向硬盘写入的+缓存的)=当前的水位小于最高水位线
let lessThanMark = ws.write('1');
console.log("lessThanMark:1", lessThanMark);
lessThanMark = ws.write('2');
console.log("lessThanMark:2", lessThanMark);
lessThanMark = ws.write('3');
console.log("lessThanMark:3", lessThanMark);
//如果lessThanMark如果为false,按理说就不要再写了
//监听 可写流的排干事件
ws.once('drain', () => {
    let lessThanMark = ws.write('4');
    console.log("lessThanMark:4", lessThanMark);
    lessThanMark = ws.write('5');
    console.log("lessThanMark:5", lessThanMark);
    lessThanMark = ws.write('6');
    console.log("lessThanMark:6", lessThanMark);
});

管道

管道就是从文件中读取数据出来,然后在写到另一个地方,下面的pipe方法中dest参数就是要写到的目的地

Readable.prototype.pipe = function (dest) {
    this.on('data', function (data) {
        let lessThanMark = dest.write(data);
        if (!lessThanMark) {
            this.pause();
        }
    });
    this.on('drain', function () {
        this.resume();
    });
    return dest;
};

双工流:

双工流,既可以读,也可以写,读和写之间没有关系

const Readable = require('./Readable');
const Writable = require('./Writable');
const { inherits } = require('util');
inherits(Duplex, Readable);
function Duplex(options) {
    Readable.call(this, options);
    Writable.call(this, options);
}

const keys = Object.keys(Writable.prototype);
for (let v = 0; v < keys.length; v++) {
    const method = keys[v];
    Duplex.prototype[method] = Writable.prototype[method];
}
module.exports = Duplex;

注:由于Duplex需要同时继承Readable和Writable,而inherits只能继承一个类上的成员,我们通过inherits(Duplex, Readable);继承了Readable,然后在最后通过for循环手动将Writable上的方法拷贝到Duplex上去

转换流:

从Readable读出来的数据,经过转化流转换之后,再写到Writable里面

const readableStream = require('./readableStream');
const transformStream = require('./transformStream');
const writableStream = require('./writableStream');
readableStream.pipe(transformStream).pipe(writableStream);