https://www.dartcn.com/guides/libraries/library-tour#stream
在 Dart API 中 Stream 对象随处可见,Stream 用来表示一些列数据。 例如,HTML 中的按钮点击就是通过 stream 传递的。 同样也可以将文件作为数据流来读取。
什么是Stream
stream 是Dart语言中的所谓异步数据序列的东西,简单理解,就是一个异步数据队列。
我们知道队列的特点是先进先出,stream正式如此。

更形象的比喻,stream 就像一个传送带,可以将一侧的物品自动运送到另一侧。
如上图,在另一侧,如果没人抓取,物品就会掉落消失。
但如果我们在末尾设置一个监听,当物品到达末端时,就可以出发相应的响应行为。
在dart中,Stream 有两种类型:点对点的单订阅流(Single-subscription)、广播流。
单订阅流
单订阅流的特点是只允许存在一个监听器,即使该监听器被取消后,也不允许再次注册监听器。
创建Stream
创建一个Stream有9个构造方法,其中一个是构造广播流的,这里主要看下其中5个构造单订阅流的方法。
periodic
该方法从整数0开始,在指定的间隔时间内生成一个自然数列,以下设置为每一秒生成1次,callback 函数用于对生成的整数进行处理,处理后再放入 Stream 中。下面并未处理,直接返回了。要注意,这个流是无限的,它没有任何一个约束条件使之停止。在后面会介绍如何流设置条件。
void main() {// 可以在回调函数中对值进行处理,这里直接返回了int callback(value) {return value;}test() async {// 使用 periodic 创建单订阅流,第一个参数为间隔时间,第二个参数为回调函数。Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), callback);// await for 循环从流中读取// 类似js中 “for await...of” 循环,则是用于遍历异步的 Iterator 接口。 for await (let k of it){}await for (var i in s1) {print(i);}}test();}// 间隔1秒,依次输出 0 1 2 3 4 5 ...
fromFuture
该方法从一个Future创建Stream,当Future执行完成时,就会放入Stream中,而后从Stream中将任务完成的结果取出。这种用法,很像异步队列。
void main() {print('开始');test() async {Future<String> fut = Future(() {return 'async task';});// 从Future创建StreamStream<String> s1 = Stream<String>.fromFuture(fut);await for (var i in s1) {print(i);}}test();print('结束');}// 开始// 结束// async task
fromFutures
从多个Future创建Stream,即将一系列的异步任务放入Stream中,每个Future按顺序执行,执行完成后放入Stream。
import 'dart:io';void main() {print('开始');test() async {Future<String> fut1 = Future(() {// 模拟耗时3秒sleep(Duration(seconds: 3));return 'async task1';});Future<String> fut2 = Future(() {return 'async task2';});// 将多个Future放入一个列表中,将该列表传入Stream<String> s1 = Stream<String>.fromFutures([fut1, fut2]);await for (var i in s1) {print(i);}}test();print('结束');}// 开始// 结束// 3秒后输出:// async task1// async task2
fromIterable
该方法从一个集合中创建Stream,用法与fromFutures大致相同。
void main() {test() async {// 将多个Future放入一个列表中,将该列表传入Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);await for (var i in s1) {print(i);}}test();}// 依次输出 1 2 3
value
该方法是dart2.5新增的方法,用于从单个值创建Stream。
void main() {test() async {Stream<int> s1 = Stream<int>.value(1);await for (var i in s1) {print(i);}}test();}// 1
监听 Stream
监听Stream,并从中获取数据方式也有三种方式:
- await for 循环
- forEach
- listen
await for循环、forEach循环
void main() {test() async {Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);await for (var i in s1) {print(i);}}test();}// 依次输出 1 2 3
forEach
void main() {test() async {Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);s1.forEach((x) {print(x);});}test();}// 依次输出 1 2 3
listen
StreamSubscription
参数:
onData必填参数,收到数据时触发。onError发生Error时触发。onDone完成时触发。cancelOnError遇到第一个Error时是否取消监听,默认为false。void main() {test() async {Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);s1 = s1.take(5);s1.listen((x) => print(x),onError: (e) => print('error -- $e'),onDone: () => print('done'),cancelOnError: false);}test();}// 依次输出 0 1 2 3 4 done
监听事件
```dart // 通过 ID 获取 button 并添加事件处理函数。 querySelector(‘#submitInfo’).onClick.listen((e) { // 当 button 被点击时,该代码会执行。 submitData(); });
//ID 为 “submitInfo” button 提供的 onClick 属性是一个 Stream 对象。
//如果只关心其中一个事件,可以使用,例如,first, last,或 single 属性来获取。 //要在处理时间前对事件进行测试,可以使用,例如 firstWhere(), lastWhere(), 或 singleWhere() 方法。
//如果只关心事件中的一个子集,可以使用,例如,skip(), skipWhile(),take(),takeWhile(), 和 where()。
<a name="ck0Ir"></a>## 传递 Stream常常,在使用流数据前需要改变数据的格式。 使用 `transform()` 方法生成具有不同类型数据的流:```dartvar lines = inputStream.transform(utf8.decoder).transform(LineSplitter());
上面例子中使用了两个 transformer 。 第一个使用 utf8.decoder 将整型流转换为字符串流。 接着,使用了 LineSplitter 将字符串流转换为多行字符串流。 这些 transformer 来自 dart:convert 库 (参考dart:convert section)。
处理错误和完成
处理错误和完成代码方式, 取决于使用的是 异步 for 循环(await for)还是 Stream API 。
如果使用的是异步 for 循环, 那么通过 try-catch 来处理错误。 代码位于异步 for 循环之后, 会在 stream 被关闭后执行。
Future readFileAwaitFor() async {var config = File('config.txt');Stream<List<int>> inputStream = config.openRead();var lines = inputStream.transform(utf8.decoder).transform(LineSplitter());try {await for (var line in lines) {print('Got ${line.length} characters from stream');}print('file is now closed');} catch (e) {print(e);}}
如果使用的是 Stream API, 那么通过注册 onError 监听来处理错误。 代码位于注册的 onDone 中, 会在 stream 被关闭后执行。
var config = File('config.txt');Stream<List<int>> inputStream = config.openRead();inputStream.transform(utf8.decoder).transform(LineSplitter()).listen((String line) {print('Got ${line.length} characters from stream');}, onDone: () {print('file is now closed');}, onError: (e) {print(e);});
Stream 方法
take、takeWhile
Stream<T>.take(int count)用于限制Stream中的元素数量。Stream<T>.takeWhile(bool test(T element))与take作用相似,只是它的参数是一个函数类型,且返回值必须是一个bool值。void main() {test() async {Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);// 当放入3个元素后,监听会停止,Stream会关闭// s1 = s1.take(3);// 对当前元素进行判断,不满足条件的取消监听s1 = s1.takeWhile((x) {return x < 3;// return x < 3 && x > 1;//如果0都不满足,那么在0那就停止监听了,Stream会关闭,后续的即使满足条件,也没用了。});await for (var i in s1) {print(i);}}test();}// 依次输出 0 1 2
skip、skipWhile
请注意,该方法只是从Stream中获取元素时跳过,被跳过的元素依然是被执行了的,所耗费的时间依然存在,其实只是跳过了执行完的结果而已。
void main() {test() async {Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);// 表示从Stream中跳过两个元素s1 = s1.skip(2);s1 = s1.take(5);await for (var i in s1) {print(i);}}test();}// 停滞2秒(即 0 1 耗费的时间),再依次输出 2 3 4 5 6,然后监听停止
Stream<T> skipWhile(bool test(T element)) 方法与takeWhile用法是相同的,传入一个函数对结果进行判断,表示跳过满足条件的。
toList
Future<List<T>> toList() 表示将Stream中所有数据存储在List中。
void main() {test() async {Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);s1 = s1.take(5);List<int> list = await s1.toList();print(list); //[0, 1, 2, 3, 4]}test();}
length
等待并获取流中所有数据的数量。
void main() {test() async {Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);s1 = s1.take(5);int n1 = await s1.length;print(n1); //5}test();}
StreamController
Stream的一个帮助类,可用于整个 Stream 过程的控制。
使用该类时,需要导入'dart:async',其add方法和sink.add方法是相同的,都是用于放入一个元素,addError方法用于产生一个错误,监听方法中的onError可获取错误。
import "dart:async";void main() {test() async {// 创建StreamController s1 = StreamController();// 放入事件s1.add('element_1');s1.addError('this is a error');s1.sink.add('element_2');s1.stream.listen((val) => print('val -- $val'),onError: (e) => print('error -- $e'),onDone: () => print('done'));}test();}// val -- element_1// error -- this is a error// val -- element_2
还可以在 StreamController 中传入一个指定的 stream 。
import "dart:async";void main() {test() async {var sm = Stream.periodic(Duration(seconds: 1), (e) => e);sm = sm.take(3);// 创建var s1 = StreamController();// 传入 Streams1.addStream(sm);s1.stream.listen((val) => print('val -- $val'),onError: (e) => print('error -- $e'), onDone: () => print('done'));}test();}// val -- 0// val -- 1// val -- 2
现在来看一下StreamController的原型,它有5个可选参数
factory StreamController({void onListen(),void onPause(),void onResume(),onCancel(),bool sync: false})
onListen注册监听时回调onPause当流暂停时回调onResume当流恢复时回调onCancel当监听器被取消时回调sync当值为true时表示同步控制器SynchronousStreamController,默认值为false,表示异步控制器
import "dart:async";void main() {test() async {// 创建StreamController sc = StreamController(onListen: () => print("onListen"),onPause: () => print("onPause"),onResume: () => print("onResume"),onCancel: () => print("onCancel"),sync: false);StreamSubscription ss = sc.stream.listen(print);sc.add('element_1');// 暂停ss.pause();// 恢复ss.resume();// 取消ss.cancel();// 关闭流sc.close();}test();}// onListen// onPause// onCancel// 因为监听器被取消了,且关闭了流,导致"element_1"未被输出,"onResume"亦未输出
广播流
在普通的单订阅流中调用两次listen会报错
void main() {test() async {var sm = Stream.periodic(Duration(seconds: 1), (e) => e);sm = sm.take(5);sm.listen(print);sm.listen(print);}test();}// 报错:Stream has already been listened to.
前面已经说了单订阅流的特点,而广播流则可以允许多个监听器存在,就如同广播一样,凡是监听了广播流,每个监听器都能获取到数据。要注意,如果在触发事件时将监听者正添加到广播流,则该监听器将不会接收当前正在触发的事件。如果取消监听,监听者会立即停止接收事件。
有两种方式创建广播流,一种直接从Stream创建,另一种使用StreamController创建
void main() {test() async {var sm =Stream.periodic(Duration(seconds: 1), (e) => e).asBroadcastStream();sm = sm.take(2);sm.listen((e) => print('监听1 -- $e'));sm.listen((e) => print('监听2 -- $e'));}test();}// 监听1 -- 0// 监听2 -- 0// 监听1 -- 1// 监听2 -- 1
使用StreamController
import 'dart:async';void main() {test() async {var sc = StreamController.broadcast();sc.stream.listen((e) => print('监听1 -- $e'));sc.stream.listen((e) => print('监听2 -- $e'));sc.add(1);sc.add(2);}test();}// 监听1 -- 1// 监听2 -- 1// 监听1 -- 2// 监听2 -- 2
StreamTransformer
该类可以使我们在Stream上执行数据转换。然后,这些转换被推回到流中,以便该流注册的所有监听器可以接收。
构造方法原型
handleData:响应从流中发出的任何数据事件。提供的参数是来自发出事件的数据,以及EventSink<T>,表示正在进行此转换的当前流的实例handleError:响应从流中发出的任何错误事件handleDone:当流不再有数据要处理时调用。通常在流的close()方法被调用时回调
factory StreamTransformer.fromHandlers({void handleData(S data, EventSink<T> sink),void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),void handleDone(EventSink<T> sink)})
import 'dart:async';void main() {test() async {StreamController sc = StreamController<int>();// 创建 StreamTransformer对象StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(handleData: (int data, EventSink sink) {// 操作数据后,转换为 double 类型sink.add((data * 2).toDouble());},handleError: (error, stacktrace, sink) {sink.addError('wrong: $error');},handleDone: (sink) {sink.close();},);// 调用流的transform方法,传入转换对象Stream stream = sc.stream.transform(stf);stream.listen(print);// 添加数据,这里的类型是intsc.add(1);sc.add(2);sc.add(3);// 调用后,触发handleDone回调// sc.close();}test();}// 2.0// 4.0// 6.0
总结
与流相关的操作,主要有四个类
StreamStreamControllerStreamSinkStreamSubscription
Stream是基础,为了更方便控制和管理Stream,出现了StreamController类。在StreamController类中, 提供了StreamSink 作为事件输入口,当我们调用add时,实际上是调用的sink.add,通过sink属性可以获取StreamController类中的StreamSink ,而StreamSubscription类则用于管理事件的注册、暂停与取消等,通过调用stream.listen方法返回一个StreamSubscription对象。
更多内容
更多在 command-line 应用中使用 Future 和 Stream 的实例,参考 dart:io tour. 也可以参考下列文章和教程:
