https://www.dartcn.com/guides/libraries/library-tour#stream
在 Dart API 中 Stream 对象随处可见,Stream 用来表示一些列数据。 例如,HTML 中的按钮点击就是通过 stream 传递的。 同样也可以将文件作为数据流来读取。
什么是Stream
stream 是Dart语言中的所谓异步数据序列的东西,简单理解,就是一个异步数据队列。
我们知道队列的特点是先进先出,stream正式如此。
更形象的比喻,stream 就像一个传送带,可以将一侧的物品自动运送到另一侧。
如上图,在另一侧,如果没人抓取,物品就会掉落消失。
但如果我们在末尾设置一个监听,当物品到达末端时,就可以出发相应的响应行为。
在dart中,Stream 有两种类型:点对点的单订阅流(Single-subscription)、广播流。
单订阅流
单订阅流的特点是只允许存在一个监听器,即使该监听器被取消后,也不允许再次注册监听器。
创建Stream
创建一个Stream有9个构造方法,其中一个是构造广播流的,这里主要看下其中5个构造单订阅流的方法。
periodic
该方法从整数0开始,在指定的间隔时间内生成一个自然数列,以下设置为每一秒生成1次,callback 函数用于对生成的整数进行处理,处理后再放入 Stream 中。下面并未处理,直接返回了。要注意,这个流是无限的,它没有任何一个约束条件使之停止。在后面会介绍如何流设置条件。
void main() {
// 可以在回调函数中对值进行处理,这里直接返回了
int callback(value) {
return value;
}
test() async {
// 使用 periodic 创建单订阅流,第一个参数为间隔时间,第二个参数为回调函数。
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), callback);
// await for 循环从流中读取
// 类似js中 “for await...of” 循环,则是用于遍历异步的 Iterator 接口。 for await (let k of it){}
await for (var i in s1) {
print(i);
}
}
test();
}
// 间隔1秒,依次输出 0 1 2 3 4 5 ...
fromFuture
该方法从一个Future创建Stream,当Future执行完成时,就会放入Stream中,而后从Stream中将任务完成的结果取出。这种用法,很像异步队列。
void main() {
print('开始');
test() async {
Future<String> fut = Future(() {
return 'async task';
});
// 从Future创建Stream
Stream<String> s1 = Stream<String>.fromFuture(fut);
await for (var i in s1) {
print(i);
}
}
test();
print('结束');
}
// 开始
// 结束
// async task
fromFutures
从多个Future创建Stream,即将一系列的异步任务放入Stream中,每个Future按顺序执行,执行完成后放入Stream。
import 'dart:io';
void main() {
print('开始');
test() async {
Future<String> fut1 = Future(() {
// 模拟耗时3秒
sleep(Duration(seconds: 3));
return 'async task1';
});
Future<String> fut2 = Future(() {
return 'async task2';
});
// 将多个Future放入一个列表中,将该列表传入
Stream<String> s1 = Stream<String>.fromFutures([fut1, fut2]);
await for (var i in s1) {
print(i);
}
}
test();
print('结束');
}
// 开始
// 结束
// 3秒后输出:
// async task1
// async task2
fromIterable
该方法从一个集合中创建Stream,用法与fromFutures大致相同。
void main() {
test() async {
// 将多个Future放入一个列表中,将该列表传入
Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
await for (var i in s1) {
print(i);
}
}
test();
}
// 依次输出 1 2 3
value
该方法是dart2.5新增的方法,用于从单个值创建Stream。
void main() {
test() async {
Stream<int> s1 = Stream<int>.value(1);
await for (var i in s1) {
print(i);
}
}
test();
}
// 1
监听 Stream
监听Stream,并从中获取数据方式也有三种方式:
- await for 循环
- forEach
- listen
await for循环、forEach循环
void main() {
test() async {
Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
await for (var i in s1) {
print(i);
}
}
test();
}
// 依次输出 1 2 3
forEach
void main() {
test() async {
Stream<int> s1 = Stream<int>.fromIterable([1, 2, 3]);
s1.forEach((x) {
print(x);
});
}
test();
}
// 依次输出 1 2 3
listen
StreamSubscription
参数:
onData
必填参数,收到数据时触发。onError
发生Error时触发。onDone
完成时触发。cancelOnError
遇到第一个Error时是否取消监听,默认为false。void main() {
test() async {
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
s1 = s1.take(5);
s1.listen((x) => print(x),
onError: (e) => print('error -- $e'),
onDone: () => print('done'),
cancelOnError: false);
}
test();
}
// 依次输出 0 1 2 3 4 done
监听事件
```dart // 通过 ID 获取 button 并添加事件处理函数。 querySelector(‘#submitInfo’).onClick.listen((e) { // 当 button 被点击时,该代码会执行。 submitData(); });
//ID 为 “submitInfo” button 提供的 onClick 属性是一个 Stream 对象。
//如果只关心其中一个事件,可以使用,例如,first, last,或 single 属性来获取。 //要在处理时间前对事件进行测试,可以使用,例如 firstWhere(), lastWhere(), 或 singleWhere() 方法。
//如果只关心事件中的一个子集,可以使用,例如,skip(), skipWhile(),take(),takeWhile(), 和 where()。
<a name="ck0Ir"></a>
## 传递 Stream
常常,在使用流数据前需要改变数据的格式。 使用 `transform()` 方法生成具有不同类型数据的流:
```dart
var lines = inputStream
.transform(utf8.decoder)
.transform(LineSplitter());
上面例子中使用了两个 transformer 。 第一个使用 utf8.decoder 将整型流转换为字符串流。 接着,使用了 LineSplitter 将字符串流转换为多行字符串流。 这些 transformer 来自 dart:convert 库 (参考dart:convert section)。
处理错误和完成
处理错误和完成代码方式, 取决于使用的是 异步 for 循环(await for
)还是 Stream API 。
如果使用的是异步 for 循环, 那么通过 try-catch 来处理错误。 代码位于异步 for 循环之后, 会在 stream 被关闭后执行。
Future readFileAwaitFor() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
var lines = inputStream
.transform(utf8.decoder)
.transform(LineSplitter());
try {
await for (var line in lines) {
print('Got ${line.length} characters from stream');
}
print('file is now closed');
} catch (e) {
print(e);
}
}
如果使用的是 Stream API, 那么通过注册 onError
监听来处理错误。 代码位于注册的 onDone
中, 会在 stream 被关闭后执行。
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
inputStream
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((String line) {
print('Got ${line.length} characters from stream');
}, onDone: () {
print('file is now closed');
}, onError: (e) {
print(e);
});
Stream 方法
take、takeWhile
Stream<T>.take(int count)
用于限制Stream
中的元素数量。Stream<T>.takeWhile(bool test(T element))
与take
作用相似,只是它的参数是一个函数类型,且返回值必须是一个bool
值。void main() {
test() async {
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
// 当放入3个元素后,监听会停止,Stream会关闭
// s1 = s1.take(3);
// 对当前元素进行判断,不满足条件的取消监听
s1 = s1.takeWhile((x) {
return x < 3;
// return x < 3 && x > 1;
//如果0都不满足,那么在0那就停止监听了,Stream会关闭,后续的即使满足条件,也没用了。
});
await for (var i in s1) {
print(i);
}
}
test();
}
// 依次输出 0 1 2
skip、skipWhile
请注意,该方法只是从Stream
中获取元素时跳过,被跳过的元素依然是被执行了的,所耗费的时间依然存在,其实只是跳过了执行完的结果而已。
void main() {
test() async {
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
// 表示从Stream中跳过两个元素
s1 = s1.skip(2);
s1 = s1.take(5);
await for (var i in s1) {
print(i);
}
}
test();
}
// 停滞2秒(即 0 1 耗费的时间),再依次输出 2 3 4 5 6,然后监听停止
Stream<T> skipWhile(bool test(T element))
方法与takeWhile
用法是相同的,传入一个函数对结果进行判断,表示跳过满足条件的。
toList
Future<List<T>> toList()
表示将Stream
中所有数据存储在List中。
void main() {
test() async {
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
s1 = s1.take(5);
List<int> list = await s1.toList();
print(list); //[0, 1, 2, 3, 4]
}
test();
}
length
等待并获取流中所有数据的数量。
void main() {
test() async {
Stream<int> s1 = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
s1 = s1.take(5);
int n1 = await s1.length;
print(n1); //5
}
test();
}
StreamController
Stream
的一个帮助类,可用于整个 Stream
过程的控制。
使用该类时,需要导入'dart:async'
,其add
方法和sink.add
方法是相同的,都是用于放入一个元素,addError
方法用于产生一个错误,监听方法中的onError
可获取错误。
import "dart:async";
void main() {
test() async {
// 创建
StreamController s1 = StreamController();
// 放入事件
s1.add('element_1');
s1.addError('this is a error');
s1.sink.add('element_2');
s1.stream.listen(
(val) => print('val -- $val'),
onError: (e) => print('error -- $e'),
onDone: () => print('done')
);
}
test();
}
// val -- element_1
// error -- this is a error
// val -- element_2
还可以在 StreamController
中传入一个指定的 stream
。
import "dart:async";
void main() {
test() async {
var sm = Stream.periodic(Duration(seconds: 1), (e) => e);
sm = sm.take(3);
// 创建
var s1 = StreamController();
// 传入 Stream
s1.addStream(sm);
s1.stream.listen((val) => print('val -- $val'),
onError: (e) => print('error -- $e'), onDone: () => print('done'));
}
test();
}
// val -- 0
// val -- 1
// val -- 2
现在来看一下StreamController
的原型,它有5个可选参数
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false})
onListen
注册监听时回调onPause
当流暂停时回调onResume
当流恢复时回调onCancel
当监听器被取消时回调sync
当值为true
时表示同步控制器SynchronousStreamController
,默认值为false
,表示异步控制器
import "dart:async";
void main() {
test() async {
// 创建
StreamController sc = StreamController(
onListen: () => print("onListen"),
onPause: () => print("onPause"),
onResume: () => print("onResume"),
onCancel: () => print("onCancel"),
sync: false);
StreamSubscription ss = sc.stream.listen(print);
sc.add('element_1');
// 暂停
ss.pause();
// 恢复
ss.resume();
// 取消
ss.cancel();
// 关闭流
sc.close();
}
test();
}
// onListen
// onPause
// onCancel
// 因为监听器被取消了,且关闭了流,导致"element_1"未被输出,"onResume"亦未输出
广播流
在普通的单订阅流中调用两次listen
会报错
void main() {
test() async {
var sm = Stream.periodic(Duration(seconds: 1), (e) => e);
sm = sm.take(5);
sm.listen(print);
sm.listen(print);
}
test();
}
// 报错:Stream has already been listened to.
前面已经说了单订阅流的特点,而广播流则可以允许多个监听器存在,就如同广播一样,凡是监听了广播流,每个监听器都能获取到数据。要注意,如果在触发事件时将监听者正添加到广播流,则该监听器将不会接收当前正在触发的事件。如果取消监听,监听者会立即停止接收事件。
有两种方式创建广播流,一种直接从Stream
创建,另一种使用StreamController
创建
void main() {
test() async {
var sm =
Stream.periodic(Duration(seconds: 1), (e) => e).asBroadcastStream();
sm = sm.take(2);
sm.listen((e) => print('监听1 -- $e'));
sm.listen((e) => print('监听2 -- $e'));
}
test();
}
// 监听1 -- 0
// 监听2 -- 0
// 监听1 -- 1
// 监听2 -- 1
使用StreamController
import 'dart:async';
void main() {
test() async {
var sc = StreamController.broadcast();
sc.stream.listen((e) => print('监听1 -- $e'));
sc.stream.listen((e) => print('监听2 -- $e'));
sc.add(1);
sc.add(2);
}
test();
}
// 监听1 -- 1
// 监听2 -- 1
// 监听1 -- 2
// 监听2 -- 2
StreamTransformer
该类可以使我们在Stream
上执行数据转换。然后,这些转换被推回到流中,以便该流注册的所有监听器可以接收。
构造方法原型
handleData
:响应从流中发出的任何数据事件。提供的参数是来自发出事件的数据,以及EventSink<T>
,表示正在进行此转换的当前流的实例handleError
:响应从流中发出的任何错误事件handleDone
:当流不再有数据要处理时调用。通常在流的close()
方法被调用时回调
factory StreamTransformer.fromHandlers({
void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)
})
import 'dart:async';
void main() {
test() async {
StreamController sc = StreamController<int>();
// 创建 StreamTransformer对象
StreamTransformer stf = StreamTransformer<int, double>.fromHandlers(
handleData: (int data, EventSink sink) {
// 操作数据后,转换为 double 类型
sink.add((data * 2).toDouble());
},
handleError: (error, stacktrace, sink) {
sink.addError('wrong: $error');
},
handleDone: (sink) {
sink.close();
},
);
// 调用流的transform方法,传入转换对象
Stream stream = sc.stream.transform(stf);
stream.listen(print);
// 添加数据,这里的类型是int
sc.add(1);
sc.add(2);
sc.add(3);
// 调用后,触发handleDone回调
// sc.close();
}
test();
}
// 2.0
// 4.0
// 6.0
总结
与流相关的操作,主要有四个类
Stream
StreamController
StreamSink
StreamSubscription
Stream
是基础,为了更方便控制和管理Stream
,出现了StreamController
类。在StreamController
类中, 提供了StreamSink
作为事件输入口,当我们调用add
时,实际上是调用的sink.add
,通过sink
属性可以获取StreamController
类中的StreamSink
,而StreamSubscription
类则用于管理事件的注册、暂停与取消等,通过调用stream.listen
方法返回一个StreamSubscription
对象。
更多内容
更多在 command-line 应用中使用 Future 和 Stream 的实例,参考 dart:io tour. 也可以参考下列文章和教程: