https://www.dartcn.com/guides/libraries/library-tour#stream

在 Dart API 中 Stream 对象随处可见,Stream 用来表示一些列数据。 例如,HTML 中的按钮点击就是通过 stream 传递的。 同样也可以将文件作为数据流来读取。

什么是Stream

stream 是Dart语言中的所谓异步数据序列的东西,简单理解,就是一个异步数据队列。
我们知道队列的特点是先进先出,stream正式如此。

Stream - 图1 Stream - 图2

更形象的比喻,stream 就像一个传送带,可以将一侧的物品自动运送到另一侧。
如上图,在另一侧,如果没人抓取,物品就会掉落消失。
但如果我们在末尾设置一个监听,当物品到达末端时,就可以出发相应的响应行为。

在dart中,Stream 有两种类型:点对点的单订阅流(Single-subscription)、广播流。

单订阅流

单订阅流的特点是只允许存在一个监听器,即使该监听器被取消后,也不允许再次注册监听器。

创建Stream

创建一个Stream有9个构造方法,其中一个是构造广播流的,这里主要看下其中5个构造单订阅流的方法。

periodic

该方法从整数0开始,在指定的间隔时间内生成一个自然数列,以下设置为每一秒生成1次,callback 函数用于对生成的整数进行处理,处理后再放入 Stream 中。下面并未处理,直接返回了。要注意,这个流是无限的,它没有任何一个约束条件使之停止。在后面会介绍如何流设置条件。

  1. void main() {
  2. // 可以在回调函数中对值进行处理,这里直接返回了
  3. int callback(value) {
  4. return value;
  5. }
  6. test() async {
  7. // 使用 periodic 创建单订阅流,第一个参数为间隔时间,第二个参数为回调函数。
  8. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), callback);
  9. // await for 循环从流中读取
  10. // 类似js中 “for await...of” 循环,则是用于遍历异步的 Iterator 接口。 for await (let k of it){}
  11. await for (var i in s1) {
  12. print(i);
  13. }
  14. }
  15. test();
  16. }
  17. // 间隔1秒,依次输出 0 1 2 3 4 5 ...

fromFuture

该方法从一个Future创建Stream,当Future执行完成时,就会放入Stream中,而后从Stream中将任务完成的结果取出。这种用法,很像异步队列。

  1. void main() {
  2. print('开始');
  3. test() async {
  4. Future<String> fut = Future(() {
  5. return 'async task';
  6. });
  7. // 从Future创建Stream
  8. Stream<String> s1 = Stream<String>.fromFuture(fut);
  9. await for (var i in s1) {
  10. print(i);
  11. }
  12. }
  13. test();
  14. print('结束');
  15. }
  16. // 开始
  17. // 结束
  18. // async task

fromFutures

从多个Future创建Stream,即将一系列的异步任务放入Stream中,每个Future按顺序执行,执行完成后放入Stream。

  1. import 'dart:io';
  2. void main() {
  3. print('开始');
  4. test() async {
  5. Future<String> fut1 = Future(() {
  6. // 模拟耗时3秒
  7. sleep(Duration(seconds: 3));
  8. return 'async task1';
  9. });
  10. Future<String> fut2 = Future(() {
  11. return 'async task2';
  12. });
  13. // 将多个Future放入一个列表中,将该列表传入
  14. Stream<String> s1 = Stream<String>.fromFutures([fut1, fut2]);
  15. await for (var i in s1) {
  16. print(i);
  17. }
  18. }
  19. test();
  20. print('结束');
  21. }
  22. // 开始
  23. // 结束
  24. // 3秒后输出:
  25. // async task1
  26. // async task2

fromIterable

该方法从一个集合中创建Stream,用法与fromFutures大致相同。

  1. void main() {
  2. test() async {
  3. // 将多个Future放入一个列表中,将该列表传入
  4. Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
  5. await for (var i in s1) {
  6. print(i);
  7. }
  8. }
  9. test();
  10. }
  11. // 依次输出 1 2 3

value

该方法是dart2.5新增的方法,用于从单个值创建Stream。

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.value(1);
  4. await for (var i in s1) {
  5. print(i);
  6. }
  7. }
  8. test();
  9. }
  10. // 1

监听 Stream

监听Stream,并从中获取数据方式也有三种方式:

  • await for 循环
  • forEach
  • listen

await for循环、forEach循环

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
  4. await for (var i in s1) {
  5. print(i);
  6. }
  7. }
  8. test();
  9. }
  10. // 依次输出 1 2 3

forEach

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
  4. s1.forEach((x) {
  5. print(x);
  6. });
  7. }
  8. test();
  9. }
  10. // 依次输出 1 2 3

listen

StreamSubscription listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError})

参数:

  • onData 必填参数,收到数据时触发。
  • onError 发生Error时触发。
  • onDone 完成时触发。
  • cancelOnError 遇到第一个Error时是否取消监听,默认为false。

    1. void main() {
    2. test() async {
    3. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
    4. s1 = s1.take(5);
    5. s1.listen((x) => print(x),
    6. onError: (e) => print('error -- $e'),
    7. onDone: () => print('done'),
    8. cancelOnError: false);
    9. }
    10. test();
    11. }
    12. // 依次输出 0 1 2 3 4 done

    监听事件

    ```dart // 通过 ID 获取 button 并添加事件处理函数。 querySelector(‘#submitInfo’).onClick.listen((e) { // 当 button 被点击时,该代码会执行。 submitData(); });

//ID 为 “submitInfo” button 提供的 onClick 属性是一个 Stream 对象。

//如果只关心其中一个事件,可以使用,例如,first, last,或 single 属性来获取。 //要在处理时间前对事件进行测试,可以使用,例如 firstWhere(), lastWhere(), 或 singleWhere() 方法。

//如果只关心事件中的一个子集,可以使用,例如,skip(), skipWhile(),take(),takeWhile(), 和 where()。

  1. <a name="ck0Ir"></a>
  2. ## 传递 Stream
  3. 常常,在使用流数据前需要改变数据的格式。 使用 `transform()` 方法生成具有不同类型数据的流:
  4. ```dart
  5. var lines = inputStream
  6. .transform(utf8.decoder)
  7. .transform(LineSplitter());

上面例子中使用了两个 transformer 。 第一个使用 utf8.decoder 将整型流转换为字符串流。 接着,使用了 LineSplitter 将字符串流转换为多行字符串流。 这些 transformer 来自 dart:convert 库 (参考dart:convert section)。

处理错误和完成

处理错误和完成代码方式, 取决于使用的是 异步 for 循环(await for)还是 Stream API 。
如果使用的是异步 for 循环, 那么通过 try-catch 来处理错误。 代码位于异步 for 循环之后, 会在 stream 被关闭后执行。

  1. Future readFileAwaitFor() async {
  2. var config = File('config.txt');
  3. Stream<List<int>> inputStream = config.openRead();
  4. var lines = inputStream
  5. .transform(utf8.decoder)
  6. .transform(LineSplitter());
  7. try {
  8. await for (var line in lines) {
  9. print('Got ${line.length} characters from stream');
  10. }
  11. print('file is now closed');
  12. } catch (e) {
  13. print(e);
  14. }
  15. }

如果使用的是 Stream API, 那么通过注册 onError 监听来处理错误。 代码位于注册的 onDone 中, 会在 stream 被关闭后执行。

  1. var config = File('config.txt');
  2. Stream<List<int>> inputStream = config.openRead();
  3. inputStream
  4. .transform(utf8.decoder)
  5. .transform(LineSplitter())
  6. .listen((String line) {
  7. print('Got ${line.length} characters from stream');
  8. }, onDone: () {
  9. print('file is now closed');
  10. }, onError: (e) {
  11. print(e);
  12. });

Stream 方法

take、takeWhile

  • Stream<T>.take(int count) 用于限制Stream中的元素数量。
  • Stream<T>.takeWhile(bool test(T element))take作用相似,只是它的参数是一个函数类型,且返回值必须是一个bool值。

    1. void main() {
    2. test() async {
    3. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
    4. // 当放入3个元素后,监听会停止,Stream会关闭
    5. // s1 = s1.take(3);
    6. // 对当前元素进行判断,不满足条件的取消监听
    7. s1 = s1.takeWhile((x) {
    8. return x < 3;
    9. // return x < 3 && x > 1;
    10. //如果0都不满足,那么在0那就停止监听了,Stream会关闭,后续的即使满足条件,也没用了。
    11. });
    12. await for (var i in s1) {
    13. print(i);
    14. }
    15. }
    16. test();
    17. }
    18. // 依次输出 0 1 2

skip、skipWhile

请注意,该方法只是从Stream中获取元素时跳过,被跳过的元素依然是被执行了的,所耗费的时间依然存在,其实只是跳过了执行完的结果而已。

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
  4. // 表示从Stream中跳过两个元素
  5. s1 = s1.skip(2);
  6. s1 = s1.take(5);
  7. await for (var i in s1) {
  8. print(i);
  9. }
  10. }
  11. test();
  12. }
  13. // 停滞2秒(即 0 1 耗费的时间),再依次输出 2 3 4 5 6,然后监听停止

Stream<T> skipWhile(bool test(T element)) 方法与takeWhile用法是相同的,传入一个函数对结果进行判断,表示跳过满足条件的。

toList

Future<List<T>> toList() 表示将Stream中所有数据存储在List中。

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
  4. s1 = s1.take(5);
  5. List<int> list = await s1.toList();
  6. print(list); //[0, 1, 2, 3, 4]
  7. }
  8. test();
  9. }

length

等待并获取流中所有数据的数量。

  1. void main() {
  2. test() async {
  3. Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
  4. s1 = s1.take(5);
  5. int n1 = await s1.length;
  6. print(n1); //5
  7. }
  8. test();
  9. }

StreamController

Stream的一个帮助类,可用于整个 Stream 过程的控制。

使用该类时,需要导入'dart:async',其add方法和sink.add方法是相同的,都是用于放入一个元素,addError方法用于产生一个错误,监听方法中的onError可获取错误。

  1. import "dart:async";
  2. void main() {
  3. test() async {
  4. // 创建
  5. StreamController s1 = StreamController();
  6. // 放入事件
  7. s1.add('element_1');
  8. s1.addError('this is a error');
  9. s1.sink.add('element_2');
  10. s1.stream.listen(
  11. (val) => print('val -- $val'),
  12. onError: (e) => print('error -- $e'),
  13. onDone: () => print('done')
  14. );
  15. }
  16. test();
  17. }
  18. // val -- element_1
  19. // error -- this is a error
  20. // val -- element_2

还可以在 StreamController 中传入一个指定的 stream
  1. import "dart:async";
  2. void main() {
  3. test() async {
  4. var sm = Stream.periodic(Duration(seconds: 1), (e) => e);
  5. sm = sm.take(3);
  6. // 创建
  7. var s1 = StreamController();
  8. // 传入 Stream
  9. s1.addStream(sm);
  10. s1.stream.listen((val) => print('val -- $val'),
  11. onError: (e) => print('error -- $e'), onDone: () => print('done'));
  12. }
  13. test();
  14. }
  15. // val -- 0
  16. // val -- 1
  17. // val -- 2


现在来看一下StreamController的原型,它有5个可选参数
  1. factory StreamController(
  2. {void onListen(),
  3. void onPause(),
  4. void onResume(),
  5. onCancel(),
  6. bool sync: false})
  • onListen 注册监听时回调
  • onPause 当流暂停时回调
  • onResume 当流恢复时回调
  • onCancel 当监听器被取消时回调
  • sync 当值为true时表示同步控制器SynchronousStreamController,默认值为false,表示异步控制器
  1. import "dart:async";
  2. void main() {
  3. test() async {
  4. // 创建
  5. StreamController sc = StreamController(
  6. onListen: () => print("onListen"),
  7. onPause: () => print("onPause"),
  8. onResume: () => print("onResume"),
  9. onCancel: () => print("onCancel"),
  10. sync: false);
  11. StreamSubscription ss = sc.stream.listen(print);
  12. sc.add('element_1');
  13. // 暂停
  14. ss.pause();
  15. // 恢复
  16. ss.resume();
  17. // 取消
  18. ss.cancel();
  19. // 关闭流
  20. sc.close();
  21. }
  22. test();
  23. }
  24. // onListen
  25. // onPause
  26. // onCancel
  27. // 因为监听器被取消了,且关闭了流,导致"element_1"未被输出,"onResume"亦未输出

广播流

在普通的单订阅流中调用两次listen会报错

  1. void main() {
  2. test() async {
  3. var sm = Stream.periodic(Duration(seconds: 1), (e) => e);
  4. sm = sm.take(5);
  5. sm.listen(print);
  6. sm.listen(print);
  7. }
  8. test();
  9. }
  10. // 报错:Stream has already been listened to.

前面已经说了单订阅流的特点,而广播流则可以允许多个监听器存在,就如同广播一样,凡是监听了广播流,每个监听器都能获取到数据。要注意,如果在触发事件时将监听者正添加到广播流,则该监听器将不会接收当前正在触发的事件。如果取消监听,监听者会立即停止接收事件。

有两种方式创建广播流,一种直接从Stream创建,另一种使用StreamController创建

  1. void main() {
  2. test() async {
  3. var sm =
  4. Stream.periodic(Duration(seconds: 1), (e) => e).asBroadcastStream();
  5. sm = sm.take(2);
  6. sm.listen((e) => print('监听1 -- $e'));
  7. sm.listen((e) => print('监听2 -- $e'));
  8. }
  9. test();
  10. }
  11. // 监听1 -- 0
  12. // 监听2 -- 0
  13. // 监听1 -- 1
  14. // 监听2 -- 1

使用StreamController

  1. import 'dart:async';
  2. void main() {
  3. test() async {
  4. var sc = StreamController.broadcast();
  5. sc.stream.listen((e) => print('监听1 -- $e'));
  6. sc.stream.listen((e) => print('监听2 -- $e'));
  7. sc.add(1);
  8. sc.add(2);
  9. }
  10. test();
  11. }
  12. // 监听1 -- 1
  13. // 监听2 -- 1
  14. // 监听1 -- 2
  15. // 监听2 -- 2

StreamTransformer

该类可以使我们在Stream上执行数据转换。然后,这些转换被推回到流中,以便该流注册的所有监听器可以接收。

构造方法原型
  • handleData:响应从流中发出的任何数据事件。提供的参数是来自发出事件的数据,以及EventSink<T>,表示正在进行此转换的当前流的实例
  • handleError:响应从流中发出的任何错误事件
  • handleDone:当流不再有数据要处理时调用。通常在流的close()方法被调用时回调
  1. factory StreamTransformer.fromHandlers({
  2. void handleData(S data, EventSink<T> sink),
  3. void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
  4. void handleDone(EventSink<T> sink)
  5. })
  1. import 'dart:async';
  2. void main() {
  3. test() async {
  4. StreamController sc = StreamController<int>();
  5. // 创建 StreamTransformer对象
  6. StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
  7. handleData: (int data, EventSink sink) {
  8. // 操作数据后,转换为 double 类型
  9. sink.add((data * 2).toDouble());
  10. },
  11. handleError: (error, stacktrace, sink) {
  12. sink.addError('wrong: $error');
  13. },
  14. handleDone: (sink) {
  15. sink.close();
  16. },
  17. );
  18. // 调用流的transform方法,传入转换对象
  19. Stream stream = sc.stream.transform(stf);
  20. stream.listen(print);
  21. // 添加数据,这里的类型是int
  22. sc.add(1);
  23. sc.add(2);
  24. sc.add(3);
  25. // 调用后,触发handleDone回调
  26. // sc.close();
  27. }
  28. test();
  29. }
  30. // 2.0
  31. // 4.0
  32. // 6.0

总结

与流相关的操作,主要有四个类

  • Stream
  • StreamController
  • StreamSink
  • StreamSubscription

Stream是基础,为了更方便控制和管理Stream,出现了StreamController类。在StreamController类中, 提供了StreamSink 作为事件输入口,当我们调用add时,实际上是调用的sink.add,通过sink属性可以获取StreamController类中的StreamSink ,而StreamSubscription类则用于管理事件的注册、暂停与取消等,通过调用stream.listen方法返回一个StreamSubscription对象。

更多内容

更多在 command-line 应用中使用 Future 和 Stream 的实例,参考 dart:io tour. 也可以参考下列文章和教程: