https://blog.csdn.net/weixin_33936401/article/details/88986365

场景说明

Streams API 是为了解决一个简单但又基础的问题而生的:Web 应用如何消费有序的小信 息块而不是大块信息?这种能力主要有两种应用场景。

1、大块数据可能不会一次性都可用。
网络请求的响应就是一个典型的例子。网络负载是以连续信 息包形式交付的,而流式处理可以让应用在数据一到达就能使用,而不必等到所有数据都加载 完毕。

2、大块数据可能需要分小部分处理。
视频处理、数据压缩、图像编码和 JSON 解析都是可以分成小 部分进行处理,而不必等到所有数据都在内存中时再处理的例子。

提到流,可以把数据想像成某种通过管道输送的液体。

JavaScript 中的流借用了管道相关的概念, 因为原理是相通的。

根据规范,“这些 API 实际是为映射低级 I/O 原语而设计,包括适当时候对字节流 的规范化”。

Stream API 直接解决的问题是处理网络请求和读写磁盘。

种类

Stream API 定义了三种流。
1、 可读流:可以通过某个公共接口读取数据块的流。数据在内部从底层源进入流,然后由消费者 (consumer)进行处理。

2、可写流:可以通过某个公共接口写入数据块的流。生产者(producer)将数据写入流,数据在内 部传入底层数据槽(sink)。

3、转换流:由两种流组成,可写流用于接收数据(可写端),可读流用于输出数据(可读端)。
这 两个流之间是转换程序(transformer),可以根据需要检查和修改流内容。

流的基本单位是块(chunk)。块可是任意数据类型,但通常是定型数组。

每个块都是离散的流片段, 可以作为一个整体来处理。

更重要的是,块不是固定大小的,也不一定按固定间隔到达。

在理想的流当 中,块的大小通常近似相同,到达间隔也近似相等。

流平衡

1、流出口处理数据的速度比入口提供数据的速度快。
流出口经常空闲(可能意味着流入口效率较 低),但只会浪费一点内存或计算资源,因此这种流的不平衡是可以接受的。

2、流入和流出均衡。这是理想状态。
这个内部队列中会有零个或少量排队的块,因为流出口块出列的速 度与流入口块入列的速度近似相等。
这种流的内部队列所占用的内存相对比较小。

3、流入口提供数据的速度比出口处理数据的速度快。
这种流不平衡是固有的问题。此时一定会在 某个地方出现数据积压,流必须相应做出处理。

如果块入列速度快于出列速度,则内部队列会不断增大。

流不能允许其内部队列无限增大,因此它 会使用反压(backpressure)通知流入口停止发送数据,直到队列大小降到某个既定的阈值之下。

这个阈 值由排列策略决定,这个策略定义了内部队列可以占用的最大内存,即高水位线(high water mark)

流不平衡是常见问题,但流也提供了解决这个问题的工具。

所有流都会为已进入流但尚未离开流的 块提供一个内部队列。

===============

可读流

可读流是对底层数据源的封装。

底层数据源可以将数据填充到流中,允许消费者通过流的公共接口 读取数据。

  1. // 1、这个生成器的值可以通过可读流的控制器传入可读流。
  2. async function* ints() {
  3. // 每 1000 毫秒生成一个递增的整数
  4. for (let i = 0; i < 5; ++i) {
  5. yield await new Promise((resolve) => setTimeout(resolve, 1000, i));
  6. }
  7. }
  8. // 2、创建读取流
  9. const readableStream = new ReadableStream({
  10. // 3、定义开始方法,方法使用作为参数传入的 controller。
  11. async start(controller) {
  12. for await (let chunk of ints()) {
  13. // 4、调用控制器的 enqueue()方法可以把值传入控制器。
  14. controller.enqueue(chunk);
  15. }
  16. // 5、所有值都传完之后,调用 close()关闭流
  17. controller.close();
  18. }
  19. });
  20. //前面的例子把 5 个值加入了流的队列,但没有把它们从队列中读出来
  21. console.log(readableStream.locked); // false
  22. // 6、需要一个 ReadableStreamDefaultReader 的实例,该实例可以通过流的 getReader()方法获取
  23. const readableStreamDefaultReader = readableStream.getReader();
  24. // 7、调用这个方法会获得流的锁,保证只有这个读取器可以从流中读取值
  25. console.log(readableStream.locked); // true
  26. // 8、消费者使用这个读取器实例的 read()方法可以读出值
  27. (async function() {
  28. while(true) {
  29. const { done, value } = await readableStreamDefaultReader.read();
  30. if (done) {
  31. break;
  32. } else {
  33. console.log(value);
  34. }
  35. }
  36. })();
  37. // 0
  38. // 1
  39. // 2
  40. // 3
  41. // 4

可写流

可写流是底层数据槽的封装。

底层数据槽处理通过流的公共接口写入的数据。

  1. async function* ints() {
  2. // 每 1000 毫秒生成一个递增的整数
  3. for (let i = 0; i < 5; ++i) {
  4. yield await new Promise((resolve) => setTimeout(resolve, 1000, i));
  5. }
  6. }
  7. const writableStream = new WritableStream({
  8. write(value) {
  9. console.log(value);
  10. }
  11. });
  12. console.log(writableStream.locked); // false
  13. const writableStreamDefaultWriter = writableStream.getWriter();
  14. console.log(writableStream.locked); // true
  15. // 生产者
  16. (async function() {
  17. for await (let chunk of ints()) {
  18. await writableStreamDefaultWriter.ready;
  19. writableStreamDefaultWriter.write(chunk);
  20. }
  21. writableStreamDefaultWriter.close();
  22. })();

转换流

转换流用于组合可读流和可写流。数据块在两个流之间的转换是通过 transform()方法完成的。

  1. async function* ints() {
  2. // 每 1000 毫秒生成一个递增的整数
  3. for (let i = 0; i < 5; ++i) {
  4. yield await new Promise((resolve) => setTimeout(resolve, 1000, i));
  5. }
  6. }
  7. const { writable, readable } = new TransformStream({
  8. transform(chunk, controller) {
  9. controller.enqueue(chunk * 2);
  10. }
  11. });
  12. const readableStreamDefaultReader = readable.getReader();
  13. const writableStreamDefaultWriter = writable.getWriter();
  14. // 消费者
  15. (async function() {
  16. while (true) {
  17. const { done, value } = await readableStreamDefaultReader.read();
  18. if (done) {
  19. break;
  20. } else {
  21. console.log(value);
  22. }
  23. }
  24. })();
  25. // 生产者
  26. (async function() {
  27. for await (let chunk of ints()) {
  28. await writableStreamDefaultWriter.ready;
  29. writableStreamDefaultWriter.write(chunk);
  30. }
  31. writableStreamDefaultWriter.close();
  32. })();

通过管道连接流

流可以通过管道连接成一串。

最常见的用例是使用 pipeThrough()方法把 ReadableStream 接入 TransformStream。

从内部看,ReadableStream 先把自己的值传给 TransformStream 内部的 WritableStream,然后执行转换,接着转换后的值又在新的 ReadableStream 上出现。