视频讲解

这儿B站上有个大神的Stream的讲解视频
点击查看【bilibili】

文字讲解

Stream字面上可以理解成流,类似水滴

  1. final stream = Stream.periodic(Duration(seconds: 1), (_) => 43 );

在stream对象中每一秒中会返回43的数据流

使用场景

例如读取文件中的每一行的数据,用流的方式可以这样写

StreamBuilder

  1. File file = new File('/test.txt');
  2. Stream fileStream = file.openRead();
  3. StreamBuilder(
  4. stream: fileStream,
  5. builder: (context, snapshot) {
  6. switch (snapshot.connectionState) {
  7. case ConnectionState.none:
  8. return Text('没有数据流');
  9. break;
  10. case ConnectionState.waiting:
  11. return Text('等待数据流');
  12. break;
  13. case ConnectionState.active:
  14. if (snapshot.hasError) {
  15. return Text('数据流错误');
  16. }else {
  17. return Text('数据流正常'+snapshot.data);
  18. }
  19. break;
  20. case ConnectionState.done:
  21. return Text('数据流已经关闭');
  22. break;
  23. default:
  24. }
  25. return Container();
  26. },
  27. )

然后需要我们手动的去添加流,那就需要用到

  1. StreamController controller = StreamController();
  2. void onTap() {
  3. controller.sink.add(10);
  4. }
  5. void addErrorStream() {
  6. controller.sink.addError('error');
  7. }
  8. StreamBuilder(
  9. stream: controller.stream,
  10. builder: (context, snapshot) {
  11. switch (snapshot.connectionState) {
  12. case ConnectionState.none:
  13. return Text('没有数据流');
  14. break;
  15. case ConnectionState.waiting:
  16. return Text('等待数据流');
  17. break;
  18. case ConnectionState.active:
  19. if (snapshot.hasError) {
  20. return Text('数据流错误');
  21. }else {
  22. return Text('数据流正常'+snapshot.data);
  23. }
  24. break;
  25. case ConnectionState.done:
  26. return Text('数据流已经关闭');
  27. break;
  28. default:
  29. }
  30. return Container();
  31. },
  32. )

想要流结束时可以用

  1. controller.sink.close();

页面注销掉时

  1. controller.close();

这是通过StreamBuilder的方式去处理数据流,但有的时候并不能满足业务需求,比如流事件延时触发,这个时候就需要通过监听的方式去处理。

listen

  1. void listenStream() {
  2. Future.delayed(Duration(seconds: 1), () {
  3. controller.stream.listen((event) {},
  4. onError: (err) {},
  5. onDone: () {}
  6. );
  7. });
  8. }

再看看上篇中的EventBus怎么实现的

EventBus

对象的创建

  1. EventBus({bool sync = false})
  2. : _streamController = StreamController.broadcast(sync: sync);
  3. EventBus.customController(StreamController controller)
  4. : _streamController = controller;

两种方式对象的创建,最终的目的就是创建一个流控制器StreamController,也就是用数据流的思维去解决事件监听。

事件的开端

  1. void fire(event) {
  2. streamController.add(event);
  3. }

每一个事件event当作一个数据流,添加到controller中,这是事件的触发点,类似发布者。

事件的监听和响应

  1. Stream<T> on<T>() {
  2. if (T == dynamic) {
  3. return streamController.stream as Stream<T>;
  4. } else {
  5. return streamController.stream.where((event) => event is T).cast<T>();
  6. }
  7. }

就是获取流控制器中的stream,接着通过listen的方式,做出监听后的响应

  1. eventBus.on().listen((event) {
  2. //do something
  3. });

当然如果再往深层去看listen和add的方法实现,可以看看StreamController的代码实现

StreamController源码

对象创建

  1. factory StreamController(
  2. {void onListen()?,
  3. void onPause()?,
  4. void onResume()?,
  5. FutureOr<void> onCancel()?,
  6. bool sync = false}) {
  7. return sync
  8. ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
  9. : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
  10. }

说明对象有几个方法,但listen的触发是stream对象的,那么就要去Stream中看看

  1. StreamSubscription<T> listen(void onData(T event)?,
  2. {Function? onError, void onDone()?, bool? cancelOnError});

发现这里面确实有几个回调方法,但stream的回调怎么和controller的add方法有什么联系呢?

子对象

  1. class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
  2. implements SynchronousStreamController<T> {
  3. _SyncBroadcastStreamController(void onListen()?, void onCancel()?)
  4. : super(onListen, onCancel);
  5. void _sendData(T data) {
  6. if (_isEmpty) return;
  7. if (_hasOneListener) {
  8. _state |= _BroadcastStreamController._STATE_FIRING;
  9. _BroadcastSubscription<T> firstSubscription =
  10. _firstSubscription as dynamic;
  11. firstSubscription._add(data);
  12. _state &= ~_BroadcastStreamController._STATE_FIRING;
  13. if (_isEmpty) {
  14. _callOnCancel();
  15. }
  16. return;
  17. }
  18. _forEachListener((_BufferingStreamSubscription<T> subscription) {
  19. subscription._add(data);
  20. });
  21. }
  22. }
  23. //在_BroadcastStreamController发现
  24. void add(T data) {
  25. if (!_mayAddEvent) throw _addEventError();
  26. _sendData(data);
  27. }

这样就发现两者就联系起来了,通过_BroadcastStreamController中的add方法添加事件,然后通过_sendData方法对应到controller中的此方法,之后就是把这个事件action了

任务的调度

  1. void _forEachListener(
  2. void action(_BufferingStreamSubscription<T> subscription)) {
  3. if (_isFiring) {
  4. throw new StateError(
  5. "Cannot fire new event. Controller is already firing an event");
  6. }
  7. if (_isEmpty) return;
  8. int id = (_state & _STATE_EVENT_ID);
  9. _state ^= _STATE_EVENT_ID | _STATE_FIRING;
  10. _BroadcastSubscription<T>? subscription = _firstSubscription;
  11. while (subscription != null) {
  12. if (subscription._expectsEvent(id)) {
  13. subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
  14. action(subscription);
  15. subscription._toggleEventId();
  16. _BroadcastSubscription<T>? next = subscription._next;
  17. if (subscription._removeAfterFiring) {
  18. _removeListener(subscription);
  19. }
  20. subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
  21. subscription = next;
  22. } else {
  23. subscription = subscription._next;
  24. }
  25. }
  26. _state &= ~_STATE_FIRING;
  27. if (_isEmpty) {
  28. _callOnCancel();
  29. }
  30. }

会发现action(subscription)这行代码,说明在执行上面的subscription._add(data);,然后在看看这个_add(data);

  1. void _add(T data) {
  2. assert(!_isClosed);
  3. if (_isCanceled) return;
  4. if (_canFire) {
  5. _sendData(data);
  6. } else {
  7. _addPending(new _DelayedData<T>(data));
  8. }
  9. }
  10. void _addPending(_DelayedEvent event) {
  11. _StreamImplEvents<T>? pending = _pending as dynamic;
  12. pending ??= _StreamImplEvents<T>();
  13. _pending = pending;
  14. pending.add(event);
  15. if (!_hasPending) {
  16. _state |= _STATE_HAS_PENDING;
  17. if (!_isPaused) {
  18. pending.schedule(this);
  19. }
  20. }
  21. }
  22. void schedule(_EventDispatch<T> dispatch) {
  23. if (isScheduled) return;
  24. assert(!isEmpty);
  25. if (_eventScheduled) {
  26. assert(_state == _STATE_CANCELED);
  27. _state = _STATE_SCHEDULED;
  28. return;
  29. }
  30. scheduleMicrotask(() {
  31. int oldState = _state;
  32. _state = _STATE_UNSCHEDULED;
  33. if (oldState == _STATE_CANCELED) return;
  34. handleNext(dispatch);
  35. });
  36. _state = _STATE_SCHEDULED;
  37. }
  38. void handleNext(_EventDispatch<T> dispatch);

会发现scheduleMicrotask把这个dispatch放到微任务日程队列中,剩下的就是系统去执行那个任务了。