stream 流

例子1

  1. const fs = require('fs')
  2. const stream = fs.createWriteStream('./big_file.txt')
  3. for (let i = 0; i < 1000000; i++) {
  4. stream.write(`这是第${i}行内容\n`)
  5. }
  6. stream.end()
  7. console.log('done')
  • stream是水流,但默认没有水
  • stream.write可以让水流中有水(数据)
  • 每次写的小数据就做chunk(块)
  • 产生的数据的一端叫做source(源头)
  • 得到数据的一端叫sink(水池)

image.png

例子2

  1. const fs = require('fs')
  2. const http = require('http')
  3. const server = http.createServer()
  4. server.on('request', (request, response) => {
  5. fs.readFile('./big_file.txt', (error, data) => {
  6. if (err) throw err
  7. response.end(data)
  8. console.log('done')
  9. })
  10. })
  11. server.listen(8888)
  • 用任务管理器看看Node.js内存占用大概130mb

例子3

  1. const fs = require('fs')
  2. const http = require('http')
  3. const server = http.createServer()
  4. server.on('request', (request, response) => {
  5. const stream =
  6. fs.createReadStream('./big_file.txt')
  7. stream.pipe(response)
  8. stream.on('end', () => console.log('done'))
  9. })
  10. server.listen(8888)
  11. console.log('8888')
  • 查看Node.js 内存占用,基本不会高于30Mb
  • 文件stream和response stream通过管道相连接

    管道

    两个流可以用一个管道相连
    stream1的末尾连接上stream2的开端
    只要stream1有数据,就会流到stream2
    image.png
    stream1.pipe(stream2)

a.pipe(b).pipe(c)
等价于
a.pipe(b)
b.pipe(c)

  • 管道可以通过事件实现,一般不用这个,用pipe更简单

    1. //stream1 一有数据就塞给stream2
    2. stream1.on('data',(chunk)=>{
    3. stream2.write(chunk)
    4. })
    5. //stream1停了,就停掉stream2
    6. stream1.on('end',()=>{
    7. stream2.end()
    8. })

    Stream对象的原型链

    s = fs.createReadStream(path)

  • s的对象层级为

    • 自身属性(由fs.ReadStream构造)
    • 原型:stream.Readable.prototype
    • 二级原型:stream.Stream.prototype
    • 三级原型:events.EventEmitter.prototype
    • 四级原型:Object.prototype
  • Stream对象都继承了EventEmitter

image.png
drain(面试会问
当写入缓冲区变为空时触发。可以用来做上传节流。

  1. stream1.on('data',(chunk)=>{
  2. stream2.write(chunk)
  3. stream2.on('drain',()=>{
  4. go on write
  5. })
  6. })

finish
调用 [stream.end()](http://nodejs.cn/api/stream.html#stream_writable_end_chunk_encoding_callback) 且缓冲数据都已传给底层系统之后触发。

  1. const writer = getWritableStreamSomehow();
  2. for (let i = 0; i < 100; i++) {
  3. writer.write(`写入 #${i}!\n`);
  4. }
  5. writer.on('finish', () => {
  6. console.error('写入已完成');
  7. });
  8. writer.end('写入结尾\n');

cork
writable.cork() 方法强制把所有写入的数据都缓冲到内存中。 当调用 [stream.uncork()](http://nodejs.cn/s/6wPsns)[stream.end()](http://nodejs.cn/s/hYaxt3) 时,缓冲的数据才会被输出。
当写入大量小块数据到流时,内部缓冲可能失效,从而导致性能下降, writable.cork() 主要用于避免这种情况。 对于这种情况,实现了 writable._writev() 的流可以用更优的方式对写入的数据进行缓冲。

Stream分类

image.png
image.png

Readable

静止态paused和流动态flowing

  • 默认处于paused态
  • 添加data事件监听,它就变为flowing态
  • 删掉data事件监听,它就变为paused态
  • pause()可以将它变为paused
  • resume()可以将它变为flowing

    Writable

    drain流干了事件

  • 表示可以加点水了

  • 我们调用sream.write(chunk)的时候,可能会得到false
  • false的意思是你写的太快了,数据积压了
  • 这时候就不能再write了,要监听drain
  • 等drain事件触发,才能继续write ```javascript // Write the data to the supplied writable stream one million times. // Be attentive to back-pressure. const fs = require(‘fs’) function writeOneMillionTimes(writer, data) { let i = 1000000; write(); function write() { let ok = true; do {
    1. i--;
    2. if (i === 0) {
    3. // Last time!
    4. writer.write(data);
    5. } else {
    6. // See if we should continue, or wait.
    7. // Don't pass the callback, because we're not done yet.
    8. ok = writer.write(data);
    9. }
    } while (i > 0 && ok); if (i > 0) {
    1. // Had to stop early!
    2. // Write some more once it drains.
    3. writer.once('drain', ()=>{
    4. console.log('干涸了')
    5. write()
    6. });
    } } }

const writer=fs.createWriteStream(‘./big_file.txt’) writeOneMillionTimes(writer,’hello world’)

  1. <a name="FioFK"></a>
  2. # 自定义Steam
  3. <a name="7uiu6"></a>
  4. ## 创建一个Writable Stream
  5. ```javascript
  6. const {
  7. Writable
  8. } = require('stream')
  9. const outStream = new Writable({
  10. write(chunk, encoding, callback) {
  11. console.log(chunk.toString())
  12. callback()
  13. }
  14. })
  15. process.stdin.pipe(outStream)

创建一个Readable Stream

  1. const {
  2. Readable
  3. } = require("stream");
  4. const inStream = new Readable();
  5. inStream.push("ABCDEFGHIJKLM");
  6. inStream.push("NOPQRSTUVWXYZ");
  7. inStream.push(null); // No more data
  8. inStream.pipe(process.stdout);
  1. const {
  2. Readable
  3. } = require("stream");
  4. const inStream = new Readable({
  5. read(size) {
  6. const char = this.push(String.fromCharCode(this.currentCharCode++));
  7. console.log(`推了${char}`)
  8. if (this.currentCharCode > 90) {
  9. this.push(null);
  10. }
  11. }
  12. })
  13. inStream.currentCharCode = 65
  14. inStream.pipe(process.stdout)
  15. //这次数据是按需供给的,调用read才会给一次数据

Duplex Stream

  1. const { Duplex } = require("stream");
  2. const inoutStream = new Duplex({
  3. write(chunk, encoding, callback) {
  4. console.log(chunk.toString());
  5. callback();
  6. },
  7. read(size) {
  8. this.push(String.fromCharCode(this.currentCharCode++));
  9. if (this.currentCharCode > 90) {
  10. this.push(null);
  11. }
  12. }
  13. });
  14. inoutStream.currentCharCode = 65;
  15. process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform Stream

  1. const { Transform } = require("stream");
  2. const upperCaseTr = new Transform({
  3. transform(chunk, encoding, callback) {
  4. this.push(chunk.toString().toUpperCase());
  5. callback();
  6. }
  7. });
  8. process.stdin.pipe(upperCaseTr).pipe(process.stdout);

内置的Transform Stream

gzip压缩

  1. const fs = require("fs");
  2. const zlib = require("zlib");
  3. const file = process.argv[2];
  4. fs.createReadStream(file)
  5. .pipe(zlib.createGzip())
  6. .pipe(fs.createWriteStream(file + ".gz"));

记录写了几次 …

  1. const fs = require("fs");
  2. const zlib = require("zlib");
  3. const file = process.argv[2];
  4. fs.createReadStream(file)
  5. .pipe(zlib.createGzip())
  6. .on("data", () => process.stdout.write("."))
  7. .pipe(fs.createWriteStream(file + ".zz"))
  8. .on("finish", () => console.log("Done"))
  1. const fs = require("fs");
  2. const zlib = require("zlib");
  3. const file = process.argv[2];
  4. const {
  5. Transform
  6. } = require("stream");
  7. const reportProgress = new Transform({
  8. transform(chunk, encoding, callback) {
  9. process.stdout.write(".");
  10. callback(null, chunk);
  11. }
  12. });
  13. fs.createReadStream(file)
  14. .pipe(zlib.createGzip())
  15. .pipe(reportProgress)
  16. .pipe(fs.createWriteStream(file + ".zz"))
  17. .on("finish", () => console.log("Done"));
  1. const fs = require("fs");
  2. const zlib = require("zlib");
  3. const crypto = require("crypto");
  4. // ..
  5. fs.createReadStream(file)
  6. .pipe(crypto.createCipher("aes192", "123456"))
  7. .pipe(zlib.createGzip())
  8. .pipe(reportProgress)
  9. .pipe(fs.createWriteStream(file + ".zz"))
  10. .on("finish", () => console.log("Done"));
  11. //先加密再压缩

Node.js中的Stream

image.png#

数据流中的积压问题

背压 Back Pressure