Stream 诞生
2018年,谷歌推出了Dart中最重要的一个特性— Stream。官方对其介绍是:
Widgets + Streams = Reactive Flutter App
简单来说就是,Stream被创建的初衷就是为了实现Flutter的响应式编程。
Stream 简介
Stream(流) 的作用与RxJava、RxJS,或者是Android中 Live Data 非常相似。就如它的名字一样,它代表的是一个异步流,我们可以在代码中的任何地方定义这个流,然后在代码中其它任意位置添加数据到这个流中, Stream 会监听到数据的改变,并将改变后的数据传递给监听者。
在Dart中,代表流的类是 Stream
Stream 使用
StreamController
其中一个最常用的工具类就是 StreamController 。StreamController 就如同一个管道,在这个管道中封装了一个 Stream ,并向我们提供了两个接口来操作 Stream 。分别是:
sink - 从Stream中的一端插入数据
stream - 从Stream的另一外弹出数据
创建 Stream
StreamController<String> controller = new StreamController<String>();
向 Stream 中添加数据
通过StreamController中的sink,向Stream中添加数据:
controller.sink.add("Item1");
controller.sink.add("Item2");
controller.sink.add("Item3");
创建 Stream 监听器
通过StreamController中的stream.listen(),设置监听Stream弹出的数据。listen中传入的是一个函数
controller.stream.listen((item) => print(item));
Stream监听器的其它方法
在 Stream.dart 中listen方法的完整声明如下:
/**
* Adds a subscription to this stream.
*/StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError});
可以看出,除了一个必选参数 onData 函数之外,还有两个可选参数分别是: onError 和 onDone 。这就意味着,除了简答的接收Stream内弹出的数据data之外,当 Stream 内部处理逻辑发生错误或者处理逻辑已经全部执行完毕,Stream分别会调用 onError 和 onDone 这两个函数来通知监听者。
我们可以调用sink中的 addError 和 close 方法来触发 onError和 onDone 回调。
// 向Stream中添加error
controller.sink.addError('there is a problem!');
controller.sink.close(); // 调用close方法,结束Stream中的逻辑处理
Stream 使用完整代码
以下是通过StreamController来操作Stream的完整示例代码:
import 'dart:async';
void main() {
// 初始化StreamController
final StreamController controller = StreamController();
// 设置Stream的监听器
controller.stream.listen(
(data) => print('data is $data'),
onError: (err) => print('error happen: $err'),
onDone: () => print('Mission complete!')
);
// 向Stream中添加数据
controller.sink.add("Item1");
controller.sink.add("Item2");
controller.sink.add("Item3");
// 向Stream中添加error信息
controller.sink.addError('something wrong here!');
// 关闭StreamController,释放资源
controller.close();
}
打印结果如下:
data is Item1
data is Item2
data is Item3
error happen: something wrong here!
Mission complete!
Stream 种类
Dart中的Stream有两类:
- single-subscription Stream
- broadcast Stream
这两者的使用基本相同,唯一的区别就是 single-subscription Streams 只允许被listen一次,而 broadcast stream 可以被listen多次。
single-subscription Stream
之前我们介绍的都属于 single-subscription Stream。如果调用第二次listen,则会报错。比如:
final StreamController controller = StreamController();
// 第一次listen
controller.stream.listen((data) => print('data is $data'));
// 第二次listen会报错
controller.stream.listen((data) => print('data is $data'));
controller.sink.add("Item1");
上述代码会报错:
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:668:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:818:19)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:472:9)
#3 main (file:///Users/danny.jiang/Documents/flutter_course/StreamDemo.dart:9:21)
#4 _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:305:19)
#5 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:172:12)
broadcast Stream
创建 broadcast Stream 的方式如下:
StreamController<int> controller = StreamController<int>.broadcast();
创建好之后,可以多次listen并多次向Stream中添加数据
StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));
controller.sink.add(0);
controller.sink.add(1);
StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));
controller.sink.add(2);
controller.sink.add(3);
输出结果如下:
sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3
可以看出,sub1 可以接收到 Stream 中所有的数据,而 sub2 只能接收到从订阅这个 Stream 之后发送的数据
Stream构造方法
在Stream中有3个工厂构造函数:
- fromIterable
- fromFuture
- periodic
这三个构造函数分别通过一个Iterable、 Future、或定时触发动作作为 Stream 的事件源构造对象。
fromIterable 示例
var data = [1,2,3,4,5]; // 集合数据
var stream = new Stream.fromIterable(data); // 根据集合创建Stream
// 设置监听者
stream.listen((value) => print("Received: $value"));
打印:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
fromFuture 示例
main() {
print("正在创建 Stream...");
Stream<String> stream = new Stream.fromFuture(getData());
print("Stream 创建成功");
stream.listen((data) {
print("收到: " + data);
}, onDone: () {
print("任务完成");
}, onError: (error) {
print("Error 发生");
});
print("同步代码执行中。。。");
}
Future<String> getData() async {
await Future.delayed(Duration(seconds: 5)); //模拟延时操作
print("获取网络数据成功");
return "This a test data";
}
打印:
正在创建 Stream...
Stream 创建成功
同步代码执行中。。。
// 5秒之后打印如下日志
获取网络数据成功
收到: This a test data
任务完成
periodic 示例
void main(){
test();
}
test() async{
// 使用 periodic 创建流,第一个参数为间隔时间,第二个参数为回调函数
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
stream.forEach((item) => print(item));
}
// 可以在回调函数中对值进行处理,这里直接返回了
int callback(int value){
return value;
}
打印:
// 每隔1秒钟打印一行日志
0
1
2
3
4
Stream的一些数据处理
截取Stream中的部分data
在Stream中有一些很方便的方法,可以实现从Stream中提取出我们所需要的数据集合。我还是用上文中创建的stream来举例:
var data = [1,2,3,4,5]; // 集合数据
var stream = new Stream.fromIterable(data);
stream
.where((value) => value % 2 == 0) // 判断value对2取余等于0
.listen((value) => print("where: $value"));
stream
.take(3) // 取出Stream中前3个元素
.listen((value) => print("take: $value"));
stream
.takeWhile((value) => value < 3) // 取出Stream中小于3的所有数据
.listen((value) => print("takeWhile: $value"));
stream
.skip(3) // 跳过Stream中的前3个数据
.listen((value) => print("skip: $value"));
stream
.skipWhile((value) => value < 3) // 跳过所有值小于3的数据
.listen((value) => print("skipWhile: $value"));
清除重复数据
StreamController streamController = StreamController();
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(2);
streamController.sink.add(3);
Stream newStream = streamController.stream.distinct();
newStream.listen((value) {
print("distinct: $value");
});
将Stream中的data进行转换
有两种方式可以实现data的转换
- 直接使用 Steam.map
- 通过 StreamTransformer.fromHandlers ```dart // 在元数据的基值上做 +1 操作 Stream newStream = stream.map((value) => value + 1);
newStream.listen((value) { print(“map : $value”); });
StreamTransformer.fromHandlers(…) 当需要转换数据的逻辑比较复杂,我们可以考虑调用Stream.transform()方法传入一个StreamTransformer对象来进行数据转换。其中数据转草操作都在StreamTransformer中fromHandlers的方法中
```dart
// define a stream transformer
var transformer = new StreamTransformer.fromHandlers(handleData: (value, sink) {
// 用传入的data分别狗粮Message和Body字符串,并重新添加到sink中
sink.add("Message: $value");
sink.add("Body: $value");
});
// transform the stream and listen to its output
stream.transform(transformer).listen((value) => print("transform : $value"));
校验Stream中的data
有时候,我们需要对Stream中的数据进行相关条件的校验,Stream提供了一些方法来实现这方面的需要,主要包括以下几个方法:
stream
.any((value) => value < 5)
.then((result) => print("Stream中有数据 < 5 返回 $result"));
broadcastStream
.every((value) => value < 5)
.then((result) => print("所有数据都 < 5 返回 $result"));
broadcastStream
.contains(4)
.then((result) => print("Stream中包含 4 返回 $result")); // true