https://www.jianshu.com/p/d23c688feae7

前两天看了一些讲解协程原理的文章,收获大概就是kotlin的协程相比其他语言有不小的差别,kotlin的编译器做的事情相对较少,语言层面只提供了尽可能少的必要原语,由标准库扩展库(kotlinx)利用原语进行封装,对外提供开箱即用的协程API。
那么,语言层面提供了哪些支持,标准库又做了什么呢?在了解了协程原理后,我们能够如何调整协程,使其更适合我们的业务场景呢?

语言层面的支持

  • suspend与CPS
    • 由suspend关键字修饰的函数,会被kotlin识别为“挂起函数”,也就是说这个函数执行过程中可能会挂起。为了支持函数挂起和恢复,这个函数会被编译成一个类,这个类中定义了一个关键函数和一个关键属性:
      • 一个和原挂起函数对应的成员函数,这个成员函数体被多个goto标签标记,一个标签属性用于标记函数挂起前执行到了哪个位置,每个goto标签标记一个挂起点,后续函数挂起并恢复时,可以根据标签属性直接利用goto跳转到挂起点处继续执行;
      • 此外,该函数实现中还需要负责检查挂起函数的调用结果,如果挂起函数返回了表示挂起的特殊标记,那么该函数就直接return,等待调用的挂起函数完成,调用自己的resumeWith方法,获得挂起函数结果,跳转到挂起函数后一句语句继续执行;而如果发现挂起函数并没有返回表示挂起的特殊标记,而是直接返回了结果,就说明这是一个被suspend修饰,但其实并不真的发生挂起的函数,这种情况下直接向后继续执行;
    • CPS指Continuation-Passing-Style,续体传递风格,指的是编译器如何将上述suspend函数进行变换以支持协程;
      • 对函数签名的变换:
        • 每一个suspend函数接受一个Continuation类型参数,这个参数对象被称作续体,里面定义了恢复suspend函数执行的resumeWith方法和协程上下文CoroutineContext;
        • 函数返回值则变成了Any,因为一个挂起函数除了最终需要返回原返回类型外,在中间被挂起时也需要返回一个表示被挂起的标记,因此只能定义为Any;
      • 对函数体的变换,同上面suspend介绍;
  • kotlin标准库支持
    • 标准库提供了一些和VM实现细节直接相关的接口,这些接口位于:kotlin.coroutines.intrinsics,在IDEA中可以搜intrinsicsJvm.kt找到JVM的源码,任何上层扩展库提供的协程方法最终都会调用这里定义的方法,从源码里可以看到,一个 suspend () -> T 类型的block,和 Function1, Any?> 的类型是一样的,这就是上面编译器做的工作了,这些类型转换是和VM紧密相关的,因此都放在了这里;
    • 标准库定义了CoroutineContext以及一系列Element
      • CoroutineContext,顾名思义就是协程上下文
      • Element,是协程上下文定义的一系列协程的扩展点,外部可以通过设置特定的Key来注册interceptor,对协程的执行过程进行定制,目前看到的扩展点就是协程Dispatcher扩展点,可以指定协程如何运行:是单开一个线程来运行协程,还是把协程放到某个消息队列中运行,还是把协程交给一个线程池运行?
      • Element如果不是扩展点,而是其他的类型,那么就可以用来保存和协程绑定的信息。

扩展库的工作

CoroutineScope

CoroutineContext和Element

一个CoroutineScope中定义了一个CoroutineContext。而CoroutineContext中,包含了所有coroutine运行过程中所需要的Element(下面管Element叫组件吧),CoroutineContext其实是一个indexed map,其index或者说key是组件接口,对应的value则是这个组件接口的具体实现。
CoroutineContext中的组件包括:

  • ContinuationInterceptor和CoroutineDispatcher
    • 这两个Context组件负责协程的恢复执行,其中Interceptor负责触发Dispatcher的逻辑,当需要让协程在特定的线程内调度时,就需要用到CoroutineDispatcher;
  • Job
    • Job这个组件可以用来cancel协程

父子关系

在协程支持库中,CoroutineContext会通过AbstractCouroutine建立父子关系,此时子context的组件会覆盖父的。当在CoroutineScope中启动新的协程时,它就会获取当前CoroutineScope的coroutineContext,并和它建立父子关系。
父协程总是需要等待所有子协程结束,不需要使用Job.join显式的等待协程,只有在CoroutineScope外需要等待协程时才需要使用Job.join阻塞自身等待协程执行完毕。
此外,子协程如果在执行过程中抛出了异常,异常会被传播到父协程,如果父协程没有处理该异常,异常还会继续向上传播。特别的,如果一个async的子协程在执行过程中抛出了异常,除了会导致该异常被传播到父协程外,还会导致父协程内其他正在执行的async协程被cancel。

AbstractCoroutine

kotlin的协程库中的具体Coroutine,包括BlockingCoroutine(对应runBlocking)、StandaloneCoroutine(对应GlobalScope.launch)、DeferredCoroutine(对应GlobalScope.async)等,它们都继承自AbstractCoroutine,因此要想理解协程,就必须理解这个抽象类提供了什么能力,继承它的子类又负责提供什么能力。

AbstractCoroutine是协程库的入口

协程库提供的所有启动协程的方法,其内部逻辑都是先构造一个AbstractCoroutine的子类,然后调用其start方法启动协程。

协程的运行

  1. 创建Continuation对象,kotlin中由语言编译器层面提供了将suspend函数转换为Continuation的支持,这些转换函数位于协程支持库的kotlinx.coroutines.intrinsics包下,转换后生成的对象部分对于kotlin而言是可见的,但只暴露了必要的部分,供协程支持库去完成协程逻辑,可以在kotlin.coroutines.jvm.internal包下找到这些对上层可见的类型定义。
  2. 使用CoroutineInterceptor包装Continuation对象,这一步是为了将一个裸Continuation对象包装成DispatchedContinuation,一个DispatchedContinuation会在resume时将Continuation按照特定的规则进行调度,也就是说,协程支持库在调用它的resume时,会将调用委托给dispatcher,由dispatcher决定后续如何调度协程,而裸Continuation会直接在调用resume的线程调度协程。
  3. 调用Continuation对象的resumeCancellableWith方法,这里面就会判断Continuation是否是DispatchedContinuation,确定是在当前线程直接调度还是交给dispatcher调度;
  4. Continuation被调度执行,这里调度执行的入口是resumeWith(result)方法,这里的逻辑比较复杂,需要多解释一下:
    • 我们编写的协程代码是在invokeSuspend方法被调用时执行的,这个方法体是由编译器生成的,用伪代码描述如下: ```kotlin // 一段这样的协程代码: scope.launch { callSync1() var a = callSuspend1() var b = callSync2() callSuspend2() }

// 会被kotlin编译成下面这样: class MyContinuation { var a; var b;

  1. int label;
  2. invokeSuspend(result) {
  3. if (label == -1) {
  4. throw "illegal state"
  5. }
  6. when (label) {
  7. 0 ->
  8. label = 1
  9. callSync1()
  10. result = callSuspend1()
  11. if (result == COROUTINE_SUSPENDED) {
  12. return COROUTINE_SUSPENDED
  13. }
  14. 1 ->
  15. label = 2
  16. a = result
  17. b = callSync2()
  18. result = callSuspend2()
  19. if (result == COROUTINE_SUSPENDED) {
  20. return COROUTINE_SUSPENDED
  21. }
  22. 2 ->
  23. label = -1
  24. return result
  25. }
  26. }

}

  1. - kotlin生成的Continuation中维护着一个label变量,这个变量用于确定当前Continuation执行到了哪一个suspend处,确定下一次invokeSuspend时应该从哪开始执行,调用invokeSuspend时会将上次suspend函数的返回作为参数传递,这样在invokeSuspend函数体内就可以接收到一个suspend函数的结果并将它赋值给对应的变量。
  2. - resumeWith方法调用完invokeSuspend后,会检查其返回值,若返回的是COROUTINE_SUSPENDED,就说明这个Continuation已经调用了一个suspend函数并需要异步地等待结果,当前执行流程可以暂停,resumeWith就直接return了,否则如果返回了正常结果,就说明协程已经执行完毕,需要将结果提供给外层协程包装对象(也就是AbstractCoroutine),由这个包装对象最终将结果返回给调起协程的地方。
  3. <a name="wzlqM"></a>
  4. ### 在suspend函数中调用suspend函数
  5. 上面描述了一般的suspend函数是如何被kotlin转换成协程执行的,但有个很重要的场景没有涉及到,就是在suspend函数中调用suspend函数时,发生了什么?<br />这个问题的答案非常令人沮丧,因为最重要的函数实现是看不到的:
  6. ```kotlin
  7. /**
  8. * Obtains the current continuation instance inside suspend functions and either
  9. * suspendscurrently running coroutine or returns result immediately without
  10. * suspension.
  11. * ......
  12. */
  13. public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn
  14. (crossinline block: (Continuation<T>) -> Any?): T =
  15. throw NotImplementedError(
  16. "Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic"
  17. )

从注释看,这个函数的作用是捕获当前正在执行的continuation实例,并将这个实例作为block的参数,调用block,如果block返回了SUSPEND,表示当前的continuation的确需要异步等待,因此会直接return,block中的逻辑必须要确保continuation后续会被调度;如果block返回了正常的结果,就说明当前continuation同步完成并返回了,结果会直接返回。

通过kotlin提供的这个神奇的函数,可以将当前执行的continuation捕获,让它把控制权交给block,如果block直接返回了结果,控制权会直接返回给continuation,如果block中存在异步逻辑,那么block需要返回COROUTINE_SUSPENDED,并确保在异步回调中resume原来执行中的continuation。在之前的伪代码中,出现了对callSuspend1函数的调用,下面就来说说这个callSuspend1长什么样,才能和上面的代码配合:

  1. // 这其实就是delay方法的实现
  2. suspend fun callSuspend1() {
  3. suspendCoroutineUninterceptedOrReturn(uCont ->
  4. val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
  5. /*
  6. * For non-atomic cancellation we setup parent-child relationship immediately
  7. * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
  8. * properly supports cancellation.
  9. */
  10. cancellable.initCancellability()
  11. cont.context.delay.scheduleResumeAfterDelay(timeMillis, cancellable)
  12. cancellable.getResult()
  13. )
  14. }

上面的代码首先将当前continuation捕获为uCont,作为参数调用block。
在block中,首先将当前continuation作为delegate构造了CancellableContiuationImpl对象,使用协程Context上定义的delay设置了一个定时任务,延后timeMills调度cancellable,这个新的Continuation对象会在被调度时,触发delegate的调度,从而恢复原suspend函数的执行。
如果一个suspend函数本身不执行这些逻辑,而是调用了另一个suspend函数,也可以使用相同的方式来分析,这里就不再展开了。

从上面的这个例子可以看出,要编写一个suspend函数是非常麻烦的,需要考虑的边界条件很多,要用到kotlin的非公开API,事实上也不应该通过上面的那种方式编写业务的suspend函数。
kotlin预先通过协程支持库定义好了所有我们可能会用到的能力,所有我们自己编写的suspend函数应该只需要调用kotlin事先提供的API,永远不需要自己像上面那样编写自己的suspend函数。如果现有API无法满足需求,应该向kotlin提交feature request。

协程的嵌套

上面描述了协程的正常运行过程,但实际上协程还可以嵌套,并且嵌套时协程间还会建立父子关系,cancel掉父协程会导致子协程也被cancel,子协程抛出的异常会传播到父协程,这些机制又是怎么运作的?

想法

疑问:在协程原语中,创建Continuation时需要一个可选的receiver和一个必须的completion,其中completion可以理解,就是当这个continuation完成后需要通知的,正在等待结果的continuation,但是receiver怎么理解?

可以看到现在的语言基本上都提供了协程、流的支持,相比线程而言这些API的实现、思维模型都更加复杂了,但是它们同时也提供了非常强大能力,一些过去使用线程需要业务自己考虑的逻辑都能够抽象成协程的通用场景,比如loading被用户取消后,关联的请求就应该被忽略、界面被关闭后,所有的异步逻辑都应该被取消,等等。这些场景都可以利用协程的cancel逻辑直截了当的处理。
为了跟上时代,还是先熟悉协程的使用,熟悉协程的思维模式,然后了解不同语言的实现原理吧。

相比JS,kotlin的协程感觉透露了过多的实现细节出来,并没有勾勒出一个足够清晰的概念模型。。。习惯了async await关键字的web开发们有的学了。。。kotlin选择这样复杂的设计应该是有自己的考虑的,因为kotlin是一个野心很大的语言,目标是同时target到JVM、JS、Native,因此语言和核心库的设计必须要非常巧妙,一方面要具备利用每个语言特性的能力,另一方面又不失通用性。只不过这个目标似乎实在是比较大,目前Kotlin应该仅在Android开发领域得到了比较广泛的应用,跨语言能力还要打个问号。

大体上描述一下kotlin的协程的概念模型:
协程底层利用的是编译器提供的支持,将一个suspend函数转换成一个匿名类对象,这个类中保存了当前协程的挂起点,类中定义的函数是由一个大的when语句组成的,每个匹配都是一个挂起点,当协程调用了另一个协程,导致当前协程被挂起时,该函数返回。
每个协程都要被指定一个上下文,这个上下文中比较关键的一个对象是协程Dispatcher,该对象用于决定新创建的那个协程将如何被执行,这是通过返回一个被包装过的Continuation对象实现的,在外层包装中会override resumeWith方法,将下层resumeWith提交给线程池、开一个新线程执行等。
扩展库提供了对上述描述的简便封装方法。

实际使用

任何问题以文档为准:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/index.html

在协程中,可以调用suspend函数,suspend函数必须在协程中执行,因为只有协程才能够正确处理suspend要求的挂起语义。
在协程中,调用suspend函数时,不需要像await之类的关键字,kotlin中调用suspend函数的默认行为就是完成函数执行并获取返回值,相当于JS中使用await调用async函数;如果希望像JS中不带await调用async函数那样,不挂起等待结果,而是先获取一个Promise后面用到时再去等待结果,则可以将suspend函数通过async函数来调用,async返回一个Defered对象,并且保证不会挂起当前协程。可以看到,和JS或者Dart、C#不同,kotlin的协程默认挂起,如果不希望挂起需要显式写出,和这些语言正好相反。

启动协程

kotlin中,协程就是轻量级的线程,意思是一个协程是一个可挂起的异步执行流程。协程核心库中定义有四个builder function,它们都用于启动一个协程:

  1. // launch,返回Job对象,Job对象可以被join或者cancel,launch中的异常会被立即抛出
  2. fun CoroutineScope.launch(
  3. context: CoroutineContext = EmptyCoroutineContext,
  4. start: CoroutineStart = coroutineStart.DEFAULT,
  5. block: suspend CoroutineScope.() -> Unit
  6. ): Job
  7. // async,返回Deferred对象,Deferred继承自Job,可以调用Deferred对象的await方法获取
  8. // suspend函数返回的结果,如果async中产生了异常,异常会被暂存,并在await被调用时重新抛出
  9. fun <T> CoroutineScope.async(
  10. context: CoroutineContext = EmptyCoroutineContext,
  11. start: CoroutineStart = CoroutineStart.DEFAULT,
  12. block: suspend CoroutineScope.() -> T
  13. ): Deferred<T>
  14. // produce,还是一个实验Api,block参数是一个suspend函数,并且它的delegate是ProducerScope,
  15. // ProducerScope同时实现了CoroutineScope接口和SendChannel接口,因此可以在协程中直接使用
  16. // send函数将item发送到produce函数返回的Channel中,capacity参数控制Channel的类型,进而控制
  17. // 调用send时是否会挂起
  18. @ExperimentalCoroutinesApi fun <E> CoroutineScope.produce(
  19. context: CoroutineContext = EmptyCoroutineContext,
  20. capacity: Int = 0,
  21. block: suspend ProducerScope<E>.() -> Unit
  22. ): ReceiveChannel<E>
  23. // JVM或Native可用,阻塞当前线程直到协程里的所有异步流程执行结束
  24. fun <T> runBlocking(
  25. context: CoroutineContext = EmptyCoroutineContext,
  26. block: suspend CoroutineScope.() -> T
  27. ): T

标准库提供的CoroutineContext

协程标准库提供了几个继承自CoroutineDispatcher的单例,而CoroutineDispatcher的继承链的最顶端就是CoroutineContext,因此可以直接在需要CoroutineContext的地方使用。
这几个单例分别是Dispatchers.Default, Dispatchers.IO, Dispatchers.Unconfined,前两个都是基于线程池的,Default和IO线程池不是一个,应该将IO类操作放到IO池子里,将计算类操作放到Default池子里,避免大量IO阻塞计算,或大量计算阻塞IO;Unconfined一般应该不会用到,它不会将协程限定在某个线程或线程池,而是总会在当前线程执行Continuation。

suspend函数

  • delay, 非阻塞sleep
  • yield,主动挂起当前执行流程,在其他执行流程挂起或结束后再继续
  • withContext,利用指定的CoroutineContext执行另一个suspend函数,在协程中需要进行线程切换时,使用这个方法据Google文档说会比直接利用传统方式切换线程更加高效(参考资料
  • withTimeout, withTimeoutOrNull,执行suspend函数,但在超时后cancel掉
  • awaitAll
  • joinAll
  • select,等待一组suspending function中任意一个的结果