Stream 诞生
2018年,谷歌推出了Dart中最重要的一个特性— Stream。官方对其介绍是:
Widgets + Streams = Reactive Flutter App
简单来说就是,Stream被创建的初衷就是为了实现Flutter的响应式编程。
Stream 简介
Stream(流) 的作用与RxJava、RxJS,或者是Android中 Live Data 非常相似。就如它的名字一样,它代表的是一个异步流,我们可以在代码中的任何地方定义这个流,然后在代码中其它任意位置添加数据到这个流中, Stream 会监听到数据的改变,并将改变后的数据传递给监听者。
在Dart中,代表流的类是 Stream
Stream 使用
StreamController
其中一个最常用的工具类就是 StreamController 。StreamController 就如同一个管道,在这个管道中封装了一个 Stream ,并向我们提供了两个接口来操作 Stream 。分别是:
sink - 从Stream中的一端插入数据
stream - 从Stream的另一外弹出数据
创建 Stream
StreamController<String> controller = new StreamController<String>();
向 Stream 中添加数据
通过StreamController中的sink,向Stream中添加数据:
controller.sink.add("Item1");controller.sink.add("Item2");controller.sink.add("Item3");
创建 Stream 监听器
通过StreamController中的stream.listen(),设置监听Stream弹出的数据。listen中传入的是一个函数
controller.stream.listen((item) => print(item));
Stream监听器的其它方法
在 Stream.dart 中listen方法的完整声明如下:
/*** Adds a subscription to this stream.*/StreamSubscription<T> listen(void onData(T event),{Function onError, void onDone(), bool cancelOnError});
可以看出,除了一个必选参数 onData 函数之外,还有两个可选参数分别是: onError 和 onDone 。这就意味着,除了简答的接收Stream内弹出的数据data之外,当 Stream 内部处理逻辑发生错误或者处理逻辑已经全部执行完毕,Stream分别会调用 onError 和 onDone 这两个函数来通知监听者。
我们可以调用sink中的 addError 和 close 方法来触发 onError和 onDone 回调。
// 向Stream中添加errorcontroller.sink.addError('there is a problem!');controller.sink.close(); // 调用close方法,结束Stream中的逻辑处理
Stream 使用完整代码
以下是通过StreamController来操作Stream的完整示例代码:
import 'dart:async';void main() {// 初始化StreamControllerfinal StreamController controller = StreamController();// 设置Stream的监听器controller.stream.listen((data) => print('data is $data'),onError: (err) => print('error happen: $err'),onDone: () => print('Mission complete!'));// 向Stream中添加数据controller.sink.add("Item1");controller.sink.add("Item2");controller.sink.add("Item3");// 向Stream中添加error信息controller.sink.addError('something wrong here!');// 关闭StreamController,释放资源controller.close();}
打印结果如下:
data is Item1data is Item2data is Item3error happen: something wrong here!Mission complete!
Stream 种类
Dart中的Stream有两类:
- single-subscription Stream
- broadcast Stream
这两者的使用基本相同,唯一的区别就是 single-subscription Streams 只允许被listen一次,而 broadcast stream 可以被listen多次。
single-subscription Stream
之前我们介绍的都属于 single-subscription Stream。如果调用第二次listen,则会报错。比如:
final StreamController controller = StreamController();// 第一次listencontroller.stream.listen((data) => print('data is $data'));// 第二次listen会报错controller.stream.listen((data) => print('data is $data'));controller.sink.add("Item1");
上述代码会报错:
Unhandled exception:Bad state: Stream has already been listened to.#0 _StreamController._subscribe (dart:async/stream_controller.dart:668:7)#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:818:19)#2 _StreamImpl.listen (dart:async/stream_impl.dart:472:9)#3 main (file:///Users/danny.jiang/Documents/flutter_course/StreamDemo.dart:9:21)#4 _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:305:19)#5 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:172:12)
broadcast Stream
创建 broadcast Stream 的方式如下:
StreamController<int> controller = StreamController<int>.broadcast();
创建好之后,可以多次listen并多次向Stream中添加数据
StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));controller.sink.add(0);controller.sink.add(1);StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));controller.sink.add(2);controller.sink.add(3);
输出结果如下:
sub1 value is 0sub2 value is 2sub1 value is 1sub2 value is 3sub1 value is 2sub1 value is 3
可以看出,sub1 可以接收到 Stream 中所有的数据,而 sub2 只能接收到从订阅这个 Stream 之后发送的数据
Stream构造方法
在Stream中有3个工厂构造函数:
- fromIterable
- fromFuture
- periodic
这三个构造函数分别通过一个Iterable、 Future、或定时触发动作作为 Stream 的事件源构造对象。
fromIterable 示例
var data = [1,2,3,4,5]; // 集合数据var stream = new Stream.fromIterable(data); // 根据集合创建Stream// 设置监听者stream.listen((value) => print("Received: $value"));
打印:
Received: 1Received: 2Received: 3Received: 4Received: 5
fromFuture 示例
main() {print("正在创建 Stream...");Stream<String> stream = new Stream.fromFuture(getData());print("Stream 创建成功");stream.listen((data) {print("收到: " + data);}, onDone: () {print("任务完成");}, onError: (error) {print("Error 发生");});print("同步代码执行中。。。");}Future<String> getData() async {await Future.delayed(Duration(seconds: 5)); //模拟延时操作print("获取网络数据成功");return "This a test data";}
打印:
正在创建 Stream...Stream 创建成功同步代码执行中。。。// 5秒之后打印如下日志获取网络数据成功收到: This a test data任务完成
periodic 示例
void main(){test();}test() async{// 使用 periodic 创建流,第一个参数为间隔时间,第二个参数为回调函数Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);stream.forEach((item) => print(item));}// 可以在回调函数中对值进行处理,这里直接返回了int callback(int value){return value;}
打印:
// 每隔1秒钟打印一行日志01234
Stream的一些数据处理
截取Stream中的部分data
在Stream中有一些很方便的方法,可以实现从Stream中提取出我们所需要的数据集合。我还是用上文中创建的stream来举例:
var data = [1,2,3,4,5]; // 集合数据var stream = new Stream.fromIterable(data);
stream.where((value) => value % 2 == 0) // 判断value对2取余等于0.listen((value) => print("where: $value"));stream.take(3) // 取出Stream中前3个元素.listen((value) => print("take: $value"));stream.takeWhile((value) => value < 3) // 取出Stream中小于3的所有数据.listen((value) => print("takeWhile: $value"));stream.skip(3) // 跳过Stream中的前3个数据.listen((value) => print("skip: $value"));stream.skipWhile((value) => value < 3) // 跳过所有值小于3的数据.listen((value) => print("skipWhile: $value"));
清除重复数据
StreamController streamController = StreamController();streamController.sink.add(1);streamController.sink.add(2);streamController.sink.add(2);streamController.sink.add(3);Stream newStream = streamController.stream.distinct();newStream.listen((value) {print("distinct: $value");});
将Stream中的data进行转换
有两种方式可以实现data的转换
- 直接使用 Steam.map
- 通过 StreamTransformer.fromHandlers ```dart // 在元数据的基值上做 +1 操作 Stream newStream = stream.map((value) => value + 1);
newStream.listen((value) { print(“map : $value”); });
StreamTransformer.fromHandlers(…) 当需要转换数据的逻辑比较复杂,我们可以考虑调用Stream.transform()方法传入一个StreamTransformer对象来进行数据转换。其中数据转草操作都在StreamTransformer中fromHandlers的方法中```dart// define a stream transformervar transformer = new StreamTransformer.fromHandlers(handleData: (value, sink) {// 用传入的data分别狗粮Message和Body字符串,并重新添加到sink中sink.add("Message: $value");sink.add("Body: $value");});// transform the stream and listen to its outputstream.transform(transformer).listen((value) => print("transform : $value"));
校验Stream中的data
有时候,我们需要对Stream中的数据进行相关条件的校验,Stream提供了一些方法来实现这方面的需要,主要包括以下几个方法:
stream.any((value) => value < 5).then((result) => print("Stream中有数据 < 5 返回 $result"));broadcastStream.every((value) => value < 5).then((result) => print("所有数据都 < 5 返回 $result"));broadcastStream.contains(4).then((result) => print("Stream中包含 4 返回 $result")); // true
