Stream 诞生

2018年,谷歌推出了Dart中最重要的一个特性— Stream。官方对其介绍是:

Widgets + Streams = Reactive Flutter App

简单来说就是,Stream被创建的初衷就是为了实现Flutter的响应式编程。

Stream 简介

Stream(流) 的作用与RxJava、RxJS,或者是Android中 Live Data 非常相似。就如它的名字一样,它代表的是一个异步流,我们可以在代码中的任何地方定义这个流,然后在代码中其它任意位置添加数据到这个流中, Stream 会监听到数据的改变,并将改变后的数据传递给监听者。

在Dart中,代表流的类是 Stream , 但是我们并不会直接操作 Stream 这个类,而是通过SDK中提供的各种工具类来间接使用 Stream ,在这些工具类中提供了各种API来向 Sream 中插入数据,或者通知监听者数据已经处理完毕。

Stream 使用

StreamController

其中一个最常用的工具类就是 StreamController 。StreamController 就如同一个管道,在这个管道中封装了一个 Stream ,并向我们提供了两个接口来操作 Stream 。分别是:

sink - 从Stream中的一端插入数据
stream - 从Stream的另一外弹出数据
image.png

创建 Stream

  1. StreamController<String> controller = new StreamController<String>();

向 Stream 中添加数据

通过StreamController中的sink,向Stream中添加数据:

  1. controller.sink.add("Item1");
  2. controller.sink.add("Item2");
  3. controller.sink.add("Item3");

创建 Stream 监听器

通过StreamController中的stream.listen(),设置监听Stream弹出的数据。listen中传入的是一个函数

  1. controller.stream.listen((item) => print(item));

Stream监听器的其它方法

在 Stream.dart 中listen方法的完整声明如下:

  1. /**
  2. * Adds a subscription to this stream.
  3. */StreamSubscription<T> listen(void onData(T event),
  4. {Function onError, void onDone(), bool cancelOnError});

可以看出,除了一个必选参数 onData 函数之外,还有两个可选参数分别是: onError 和 onDone 。这就意味着,除了简答的接收Stream内弹出的数据data之外,当 Stream 内部处理逻辑发生错误或者处理逻辑已经全部执行完毕,Stream分别会调用 onError 和 onDone 这两个函数来通知监听者。

我们可以调用sink中的 addError 和 close 方法来触发 onError和 onDone 回调。

  1. // 向Stream中添加error
  2. controller.sink.addError('there is a problem!');
  3. controller.sink.close(); // 调用close方法,结束Stream中的逻辑处理

Stream 使用完整代码

以下是通过StreamController来操作Stream的完整示例代码:

  1. import 'dart:async';
  2. void main() {
  3. // 初始化StreamController
  4. final StreamController controller = StreamController();
  5. // 设置Stream的监听器
  6. controller.stream.listen(
  7. (data) => print('data is $data'),
  8. onError: (err) => print('error happen: $err'),
  9. onDone: () => print('Mission complete!')
  10. );
  11. // 向Stream中添加数据
  12. controller.sink.add("Item1");
  13. controller.sink.add("Item2");
  14. controller.sink.add("Item3");
  15. // 向Stream中添加error信息
  16. controller.sink.addError('something wrong here!');
  17. // 关闭StreamController,释放资源
  18. controller.close();
  19. }

打印结果如下:

  1. data is Item1
  2. data is Item2
  3. data is Item3
  4. error happen: something wrong here!
  5. Mission complete!

Stream 种类

Dart中的Stream有两类:

  • single-subscription Stream
  • broadcast Stream

这两者的使用基本相同,唯一的区别就是 single-subscription Streams 只允许被listen一次,而 broadcast stream 可以被listen多次。

single-subscription Stream

之前我们介绍的都属于 single-subscription Stream。如果调用第二次listen,则会报错。比如:

  1. final StreamController controller = StreamController();
  2. // 第一次listen
  3. controller.stream.listen((data) => print('data is $data'));
  4. // 第二次listen会报错
  5. controller.stream.listen((data) => print('data is $data'));
  6. controller.sink.add("Item1");

上述代码会报错:

  1. Unhandled exception:
  2. Bad state: Stream has already been listened to.
  3. #0 _StreamController._subscribe (dart:async/stream_controller.dart:668:7)
  4. #1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:818:19)
  5. #2 _StreamImpl.listen (dart:async/stream_impl.dart:472:9)
  6. #3 main (file:///Users/danny.jiang/Documents/flutter_course/StreamDemo.dart:9:21)
  7. #4 _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:305:19)
  8. #5 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:172:12)

broadcast Stream

创建 broadcast Stream 的方式如下:

  1. StreamController<int> controller = StreamController<int>.broadcast();

创建好之后,可以多次listen并多次向Stream中添加数据

  1. StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));
  2. controller.sink.add(0);
  3. controller.sink.add(1);
  4. StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));
  5. controller.sink.add(2);
  6. controller.sink.add(3);

输出结果如下:

  1. sub1 value is 0
  2. sub2 value is 2
  3. sub1 value is 1
  4. sub2 value is 3
  5. sub1 value is 2
  6. sub1 value is 3

可以看出,sub1 可以接收到 Stream 中所有的数据,而 sub2 只能接收到从订阅这个 Stream 之后发送的数据

Stream构造方法

在Stream中有3个工厂构造函数:

  1. fromIterable
  2. fromFuture
  3. periodic

这三个构造函数分别通过一个Iterable、 Future、或定时触发动作作为 Stream 的事件源构造对象。

fromIterable 示例

  1. var data = [1,2,3,4,5]; // 集合数据
  2. var stream = new Stream.fromIterable(data); // 根据集合创建Stream
  3. // 设置监听者
  4. stream.listen((value) => print("Received: $value"));

打印:

  1. Received: 1
  2. Received: 2
  3. Received: 3
  4. Received: 4
  5. Received: 5

fromFuture 示例

  1. main() {
  2. print("正在创建 Stream...");
  3. Stream<String> stream = new Stream.fromFuture(getData());
  4. print("Stream 创建成功");
  5. stream.listen((data) {
  6. print("收到: " + data);
  7. }, onDone: () {
  8. print("任务完成");
  9. }, onError: (error) {
  10. print("Error 发生");
  11. });
  12. print("同步代码执行中。。。");
  13. }
  14. Future<String> getData() async {
  15. await Future.delayed(Duration(seconds: 5)); //模拟延时操作
  16. print("获取网络数据成功");
  17. return "This a test data";
  18. }

打印:

  1. 正在创建 Stream...
  2. Stream 创建成功
  3. 同步代码执行中。。。
  4. // 5秒之后打印如下日志
  5. 获取网络数据成功
  6. 收到: This a test data
  7. 任务完成

periodic 示例

  1. void main(){
  2. test();
  3. }
  4. test() async{
  5. // 使用 periodic 创建流,第一个参数为间隔时间,第二个参数为回调函数
  6. Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), callback);
  7. stream.forEach((item) => print(item));
  8. }
  9. // 可以在回调函数中对值进行处理,这里直接返回了
  10. int callback(int value){
  11. return value;
  12. }

打印:

  1. // 每隔1秒钟打印一行日志
  2. 0
  3. 1
  4. 2
  5. 3
  6. 4

Stream的一些数据处理

截取Stream中的部分data

在Stream中有一些很方便的方法,可以实现从Stream中提取出我们所需要的数据集合。我还是用上文中创建的stream来举例:

  1. var data = [1,2,3,4,5]; // 集合数据
  2. var stream = new Stream.fromIterable(data);
  1. stream
  2. .where((value) => value % 2 == 0) // 判断value对2取余等于0
  3. .listen((value) => print("where: $value"));
  4. stream
  5. .take(3) // 取出Stream中前3个元素
  6. .listen((value) => print("take: $value"));
  7. stream
  8. .takeWhile((value) => value < 3) // 取出Stream中小于3的所有数据
  9. .listen((value) => print("takeWhile: $value"));
  10. stream
  11. .skip(3) // 跳过Stream中的前3个数据
  12. .listen((value) => print("skip: $value"));
  13. stream
  14. .skipWhile((value) => value < 3) // 跳过所有值小于3的数据
  15. .listen((value) => print("skipWhile: $value"));

清除重复数据

  1. StreamController streamController = StreamController();
  2. streamController.sink.add(1);
  3. streamController.sink.add(2);
  4. streamController.sink.add(2);
  5. streamController.sink.add(3);
  6. Stream newStream = streamController.stream.distinct();
  7. newStream.listen((value) {
  8. print("distinct: $value");
  9. });

将Stream中的data进行转换

有两种方式可以实现data的转换

  1. 直接使用 Steam.map
  2. 通过 StreamTransformer.fromHandlers ```dart // 在元数据的基值上做 +1 操作 Stream newStream = stream.map((value) => value + 1);

newStream.listen((value) { print(“map : $value”); });

  1. StreamTransformer.fromHandlers(…) 当需要转换数据的逻辑比较复杂,我们可以考虑调用Stream.transform()方法传入一个StreamTransformer对象来进行数据转换。其中数据转草操作都在StreamTransformerfromHandlers的方法中
  2. ```dart
  3. // define a stream transformer
  4. var transformer = new StreamTransformer.fromHandlers(handleData: (value, sink) {
  5. // 用传入的data分别狗粮Message和Body字符串,并重新添加到sink中
  6. sink.add("Message: $value");
  7. sink.add("Body: $value");
  8. });
  9. // transform the stream and listen to its output
  10. stream.transform(transformer).listen((value) => print("transform : $value"));

校验Stream中的data

有时候,我们需要对Stream中的数据进行相关条件的校验,Stream提供了一些方法来实现这方面的需要,主要包括以下几个方法:

  1. stream
  2. .any((value) => value < 5)
  3. .then((result) => print("Stream中有数据 < 5 返回 $result"));
  4. broadcastStream
  5. .every((value) => value < 5)
  6. .then((result) => print("所有数据都 < 5 返回 $result"));
  7. broadcastStream
  8. .contains(4)
  9. .then((result) => print("Stream中包含 4 返回 $result")); // true