因此今天,我会通过几个小例子,循序渐进地向你介绍 Dart 语言的 Event Loop 处理机制、异步处理和并发编程的原理和使用方法,从语言设计和实践层面理解 Dart 单线程模型下的代码运行本质,从而懂得后续如何在工作中使用 Future 与 Isolate,优化我们的项目。

Event Loop 机制

  • Dart 是单线程的
  • Dart 当然也支持异步
  • 单线程和异步并不冲突

那为什么单线程也可以异步?

这里有一个大前提,那就是我们的 App 绝大多数时间都在等待。比如,等用户点击、等网络请求返回、等文件 IO 结果,等等。而这些等待行为并不是阻塞的。比如说,网络请求,Socket 本身提供了 select 模型可以异步查询;而文件 IO,操作系统也提供了基于事件的回调机制

所以,基于这些特点,单线程模型可以在等待的过程中做别的事情,等真正需要响应结果了,再去做对应的处理。因为等待过程并不是阻塞的,所以给我们的感觉就像是同时在做多件事情一样。但其实始终只有一个线程在处理你的事情。

内核做了很多事情.

等待这个行为是通过 Event Loop 驱动的。事件队列 Event Queue 会把其他平行世界(比如 Socket)完成的,需要主线程响应的事件放入其中。像其他语言一样,Dart 也有一个巨大的事件循环,在不断的轮询事件队列,取出事件(比如,键盘事件、I\O 事件、网络事件等),在主线程同步执行其回调函数,如下图所示:

image.png

图 1 简化版 Event Loop

异步任务

事实上,图 1 的 Event Loop 示意图只是一个简化版。在 Dart 中,实际上有两个队列,一个事件队列(Event Queue),另一个则是微任务队列(Microtask Queue)。在每一次事件循环中,Dart 总是先去第一个微任务队列中查询是否有可执行的任务,如果没有,才会处理后续的事件队列的流程。

image.png

图 2 Microtask Queue 与 Event Queue

微任务顾名思义,表示一个短时间内就会完成的异步任务。从上面的流程图可以看到,微任务队列在事件循环中的优先级是最高的,只要队列中还有任务,就可以一直霸占着事件循环。

微任务是由 scheduleMicroTask 建立的。如下所示,这段代码会在下一个事件循环中输出一段字符串:

  1. scheduleMicrotask(() => print('This is a microtask'));

Dart 为 Event Queue 的任务建立提供了一层封装,叫作 Future。从名字上也很容易理解,它表示一个在未来时间才会完成的任务。

  • 把一个函数体放入 Future,就完成了从同步任务到异步任务的包装。Future 还提供了链式调用的能力,可以在异步任务执行完毕后依次执行链路上的其他函数体。

我们看一个具体的代码示例:分别声明两个异步任务,在下一个事件循环中输出一段字符串。其中第二个任务执行完毕之后,还会继续输出另外两段字符串:

  1. Future(() => print('Running in Future 1'));//下一个事件循环输出字符串
  2. Future(() => print(‘Running in Future 2'))
  3. .then((_) => print('and then 1'))
  4. .then((_) => print('and then 2’));//上一个事件循环结束后,连续输出三段字符串

正常情况下,一个 Future 异步任务的执行是相对简单的:在我们声明一个 Future 时,Dart 会将异步任务的函数执行体放入事件队列,然后立即返回,后续的代码继续同步执行。而当同步执行的代码执行完毕后,事件队列会按照加入事件队列的顺序(即声明顺序),依次取出事件,最后同步执行 Future 的函数体及后续的 then。

如果 Future 执行体已经执行完毕了,但你又拿着这个 Future 的引用,往里面加了一个 then 方法体,这时 Dart 会如何处理呢?面对这种情况,Dart 会将后续加入的 then 方法体放入微任务队列,尽快执行。
**
下面的代码演示了 Future 的执行规则,即,先加入事件队列,或者先声明的任务先执行;then 在 Future 结束后立即执行。

  1. //f1比f2先执行
  2. Future(() => print('f1'));
  3. Future(() => print('f2'));
  4. //f3执行后会立刻同步执行then 3
  5. Future(() => print('f3')).then((_) => print('then 3'));
  6. //then 4会加入微任务队列,尽快执行
  7. Future(() => null).then((_) => print('then 4'));

更多使用例子:

  1. Future(() => print('f1'));//声明一个匿名Future
  2. Future fx = Future(() => null);//声明Future fx,其执行体为null
  3. //声明一个匿名Future,并注册了两个then。在第一个then回调里启动了一个微任务
  4. Future(() => print('f2')).then((_) {
  5. print('f3');
  6. scheduleMicrotask(() => print('f4'));
  7. }).then((_) => print('f5'));
  8. //声明了一个匿名Future,并注册了两个then。第一个then是一个Future
  9. Future(() => print('f6'))
  10. .then((_) => Future(() => print('f7')))
  11. .then((_) => print('f8'));
  12. //声明了一个匿名Future
  13. Future(() => print('f9'));
  14. //往执行体为null的fx注册了了一个then
  15. fx.then((_) => print('f10'));
  16. //启动一个微任务
  17. scheduleMicrotask(() => print('f11'));
  18. print('f12');

运行结果:

  1. f12
  2. f11
  3. f1
  4. f10
  5. f2
  6. f3
  7. f5
  8. f4
  9. f6
  10. f9
  11. f7
  12. f8

你只需要记住一点:then 会在 Future 函数体执行完毕后立刻执行,无论是共用同一个事件循环还是进入下一个微任务。
**

异步函数

对于一个异步函数来说,其返回时内部执行动作并未结束,因此需要返回一个 Future 对象,供调用者使用。调用者根据 Future 对象,来决定:是在这个 Future 对象上注册一个 then,等 Future 的执行体结束了以后再进行异步处理;还是一直同步等待 Future 执行体结束。

对于异步函数返回的 Future 对象,如果调用者决定同步等待,则需要在调用处使用 await 关键字,并且在调用处的函数体使用 async 关键字。

  1. //声明了一个延迟3秒返回Hello的Future,并注册了一个then返回拼接后的Hello 2019
  2. Future<String> fetchContent() =>
  3. Future<String>.delayed(Duration(seconds:3), () => "Hello")
  4. .then((x) => "$x 2019");
  5. main() async{
  6. print(await fetchContent());//等待Hello 2019的返回
  7. }

在等待语句的调用上下文函数 main 加上了 async 关键字。为什么要加这个关键字呢?

因为 Dart 中的 await 并不是阻塞等待,而是异步等待。Dart 会将调用体的函数也视作异步函数,将等待语句的上下文放入 Event Queue 中,一旦有了结果,Event Loop 就会把它从 Event Queue 中取出,等待代码继续执行。

Isolate

尽管 Dart 是基于单线程模型的,但为了进一步利用多核 CPU,将 CPU 密集型运算进行隔离,Dart 也提供了多线程机制,即 Isolate。在 Isolate 中,资源隔离做得非常好,每个 Isolate 都有自己的 Event Loop 与 Queue,Isolate 之间不共享任何资源,只能依靠消息机制通信,因此也就没有资源抢占问题。

  1. doSth(msg) => print(msg);
  2. main() {
  3. Isolate.spawn(doSth, "Hi");
  4. ...
  5. }

对于执行结果的告知,Isolate 通过发送管道(SendPort)实现消息通信机制。我们可以在启动并发 Isolate 时将主 Isolate 的发送管道作为参数传给它,这样并发 Isolate 就可以在任务执行完毕后利用这个发送管道给我们发消息了。

  1. Isolate isolate;
  2. start() async {
  3. ReceivePort receivePort= ReceivePort();//创建管道
  4. //创建并发Isolate,并传入发送管道
  5. isolate = await Isolate.spawn(getMsg, receivePort.sendPort);
  6. //监听管道消息
  7. receivePort.listen((data) {
  8. print('Data:$data');
  9. receivePort.close();//关闭管道
  10. isolate?.kill(priority: Isolate.immediate);//杀死并发Isolate
  11. isolate = null;
  12. });
  13. }
  14. //并发Isolate往管道发送一个字符串
  15. getMsg(sendPort) => sendPort.send("Hello");

双向通信的场景我们如何实现呢?答案也很简单,让并发 Isolate 也回传一个发送管道即可。

  1. //并发计算阶乘
  2. Future<dynamic> asyncFactoriali(n) async{
  3. final response = ReceivePort();//创建管道
  4. //创建并发Isolate,并传入管道
  5. await Isolate.spawn(_isolate,response.sendPort);
  6. //等待Isolate回传管道
  7. final sendPort = await response.first as SendPort;
  8. //创建了另一个管道answer
  9. final answer = ReceivePort();
  10. //往Isolate回传的管道中发送参数,同时传入answer管道
  11. sendPort.send([n,answer.sendPort]);
  12. return answer.first;//等待Isolate通过answer管道回传执行结果
  13. }
  14. //Isolate函数体,参数是主Isolate传入的管道
  15. _isolate(initialReplyTo) async {
  16. final port = ReceivePort();//创建管道
  17. initialReplyTo.send(port.sendPort);//往主Isolate回传管道
  18. final message = await port.first as List;//等待主Isolate发送消息(参数和回传结果的管道)
  19. final data = message[0] as int;//参数
  20. final send = message[1] as SendPort;//回传结果的管道
  21. send.send(syncFactorial(data));//调用同步计算阶乘的函数回传结果
  22. }
  23. //同步计算阶乘
  24. int syncFactorial(n) => n < 2 ? n : n * syncFactorial(n-1);
  25. main() async => print(await asyncFactoriali(4));//等待并发计算阶乘结果

Flutter 提供了支持并发计算的 compute 函数,其内部对 Isolate 的创建和双向通信进行了封装抽象,屏蔽了很多底层细节,我们在调用时只需要传入函数入口和函数参数,就能够实现并发计算和消息通知。

  1. //同步计算阶乘
  2. int syncFactorial(n) => n < 2 ? n : n * syncFactorial(n-1);
  3. //使用compute函数封装Isolate的创建和结果的返回
  4. main() async => print(await compute(syncFactorial, 4));