视频讲解
这儿B站上有个大神的Stream的讲解视频
点击查看【bilibili】
文字讲解
Stream字面上可以理解成流,类似水滴
final stream = Stream.periodic(Duration(seconds: 1), (_) => 43 );
使用场景
StreamBuilder
File file = new File('/test.txt');
Stream fileStream = file.openRead();
StreamBuilder(
stream: fileStream,
builder: (context, snapshot) {
switch (snapshot.connectionState) {
case ConnectionState.none:
return Text('没有数据流');
break;
case ConnectionState.waiting:
return Text('等待数据流');
break;
case ConnectionState.active:
if (snapshot.hasError) {
return Text('数据流错误');
}else {
return Text('数据流正常'+snapshot.data);
}
break;
case ConnectionState.done:
return Text('数据流已经关闭');
break;
default:
}
return Container();
},
)
然后需要我们手动的去添加流,那就需要用到
StreamController controller = StreamController();
void onTap() {
controller.sink.add(10);
}
void addErrorStream() {
controller.sink.addError('error');
}
StreamBuilder(
stream: controller.stream,
builder: (context, snapshot) {
switch (snapshot.connectionState) {
case ConnectionState.none:
return Text('没有数据流');
break;
case ConnectionState.waiting:
return Text('等待数据流');
break;
case ConnectionState.active:
if (snapshot.hasError) {
return Text('数据流错误');
}else {
return Text('数据流正常'+snapshot.data);
}
break;
case ConnectionState.done:
return Text('数据流已经关闭');
break;
default:
}
return Container();
},
)
想要流结束时可以用
controller.sink.close();
页面注销掉时
controller.close();
这是通过StreamBuilder的方式去处理数据流,但有的时候并不能满足业务需求,比如流事件延时触发,这个时候就需要通过监听的方式去处理。
listen
void listenStream() {
Future.delayed(Duration(seconds: 1), () {
controller.stream.listen((event) {},
onError: (err) {},
onDone: () {}
);
});
}
EventBus
对象的创建
EventBus({bool sync = false})
: _streamController = StreamController.broadcast(sync: sync);
EventBus.customController(StreamController controller)
: _streamController = controller;
两种方式对象的创建,最终的目的就是创建一个流控制器StreamController,也就是用数据流的思维去解决事件监听。
事件的开端
void fire(event) {
streamController.add(event);
}
每一个事件event当作一个数据流,添加到controller中,这是事件的触发点,类似发布者。
事件的监听和响应
Stream<T> on<T>() {
if (T == dynamic) {
return streamController.stream as Stream<T>;
} else {
return streamController.stream.where((event) => event is T).cast<T>();
}
}
就是获取流控制器中的stream,接着通过listen的方式,做出监听后的响应
eventBus.on().listen((event) {
//do something
});
当然如果再往深层去看listen和add的方法实现,可以看看StreamController的代码实现
StreamController源码
对象创建
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
说明对象有几个方法,但listen的触发是stream对象的,那么就要去Stream中看看
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
发现这里面确实有几个回调方法,但stream的回调怎么和controller的add方法有什么联系呢?
子对象
class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
implements SynchronousStreamController<T> {
_SyncBroadcastStreamController(void onListen()?, void onCancel()?)
: super(onListen, onCancel);
void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController._STATE_FIRING;
_BroadcastSubscription<T> firstSubscription =
_firstSubscription as dynamic;
firstSubscription._add(data);
_state &= ~_BroadcastStreamController._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
}
}
//在_BroadcastStreamController发现
void add(T data) {
if (!_mayAddEvent) throw _addEventError();
_sendData(data);
}
这样就发现两者就联系起来了,通过_BroadcastStreamController中的add方法添加事件,然后通过_sendData方法对应到controller中的此方法,之后就是把这个事件action了
任务的调度
void _forEachListener(
void action(_BufferingStreamSubscription<T> subscription)) {
if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
if (_isEmpty) return;
int id = (_state & _STATE_EVENT_ID);
_state ^= _STATE_EVENT_ID | _STATE_FIRING;
_BroadcastSubscription<T>? subscription = _firstSubscription;
while (subscription != null) {
if (subscription._expectsEvent(id)) {
subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
action(subscription);
subscription._toggleEventId();
_BroadcastSubscription<T>? next = subscription._next;
if (subscription._removeAfterFiring) {
_removeListener(subscription);
}
subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
subscription = next;
} else {
subscription = subscription._next;
}
}
_state &= ~_STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
}
会发现action(subscription)这行代码,说明在执行上面的subscription._add(data);,然后在看看这个_add(data);
void _add(T data) {
assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T>? pending = _pending as dynamic;
pending ??= _StreamImplEvents<T>();
_pending = pending;
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
return;
}
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
}
void handleNext(_EventDispatch<T> dispatch);
会发现scheduleMicrotask把这个dispatch放到微任务日程队列中,剩下的就是系统去执行那个任务了。