kotlin语言一个非常有意思的特性是协程,在KEEP(Kotlin Evolution and Enhancement Process)文档中详细描述了kotlin协程的设计思路,这里是对协程KEEP文档的中文翻译:https://github.com/Kotlin-zh/KEEP/blob/master/proposals/coroutines.md
这篇文档不可谓不详细,对于概念的解释非常到位,下面是我对于kotlin的协程的个人感性理解。
原理部分
协程的使用
import kotlinx.coroutines.*
fun main() {
// runBlocking,是一个协程启动器,在协程执行完毕前会阻塞当前线程
//
// 我们这里传入了一个lambda,由于runBlocking函数签名为lambda声明为suspend,因此这是一个
// suspend lambda
runBlocking() {
print("enter coroutine")
// delay是一个suspend函数,当协程执行到这里会挂起,1s之后再恢复
delay(1000)
print("done")
}
}
续体
续体,即Continuation,是这样一个接口,其中定义了续体上下文和续体执行方法。
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
编译器将suspend函数转换成续体
在编译时,一个suspend函数(或lambda)会被编译器转换为一个续体,并且kotlin语言提供了一系列内建函数createCoroutine,传入一个suspend函数对象,就可以取得kotlin编译器为这个函数生成的续体。
对于上面那个非常简单的suspend lambda,由于在这个函数里调用了一次suspend函数,kotlin会把它编译成一个可以调用两次resumeWith的续体:
- 第一次调用resumeWith时,会开始执行协程,并在执行到调用delay这个suspend函数时可以被挂起(直接return)
- 如果被挂起了,第二次调用resumeWith时,则会从上次挂起的那一行之后继续执行
- 如果没有被挂起,则会直接继续往下执行到函数结尾处,此时不能再调用第二次resumeWith方法,调用时会产生IllegalStateException异常
在suspend函数执行到头之后,需要将返回值返回给续体调用方,在kotlin中,每个suspend函数都必须在另一个suspend函数内被调用,因此可以直接取得父级续体对象,用反回值作为参数调用其resumeWith方法即可将返回值传递给调用方,同时让被挂起的调用方恢复执行。kotlin语言本身提供了一些内置函数,可以用来在非suspend上下文中调用suspend函数,调用这些内置函数时必须用一个自行实现了Continuation接口的对象作为参数传递,定义好其resumeWith方法,当suspend函数执行完毕时,最终会调用这个接口对象的resumeWith方法。
具体如何将suspend函数转换为续体,并没有在KEEP中约定,只要对外的语义符合要求即可,一种简单的实现是将suspend函数转换为一个函数对象,函数对象的核心是一个大的when表达式,将每个suspend之后的逻辑放到一个条件分支后,并在对象内记录一个表示当前执行到哪一步的标记变量,在每个条件分支之后更新该变量的值,在每次resumeWith被调用时都去根据该变量继续执行后面的代码。
备注:CPS变换
在搜索Continuation时,常常会看到一个缩写:CPS,它的全称是Continuation Passing Style,翻译过来没有特别合适的名字,就叫“续体传递风格”吧。wiki上有对CPS的准确定义:https://en.wikipedia.org/wiki/Continuation-passing_style。
续体,就是一段逻辑执行完毕后,接下去要继续执行的逻辑,回调函数是最常见的续体,在引入了协程后,续体更多的指由编译器生成的续体对象。CPS,就是一种编程风格:将希望某些工作完成后,继续执行的代码逻辑封装到续体中,并将续体作为参数传入要调用的函数,函数有责任调用该续体,并可以拿到续体返回值继续执行。kotlin中协程的实现就是一种CPS,将suspend函数编译为Continuation的过程,就是CPS变换。
一般来说,只有在编译器领域的中间代码生成阶段,才会考虑如何将正常的代码编译为CPS的,但是在异步编程场景,将每个可能挂起的挂起点做CPS变换后,即可以将挂起后希望恢复执行的逻辑封装为续体对象,在调用一个可能挂起的函数时,只要传递当前函数续体对象作为参数,并期待当挂起函数恢复执行时调用续体,当前函数即可恢复执行。
编译器提供获取当前续体的方法
现在我们知道了一个suspend函数被编译器转换成了一个对象,并且在这个suspend函数被挂起后,只要调用续体对象的resumeWith方法,就能够恢复suspend函数的执行,下面可以考虑一下,一个suspend函数具体是如何被挂起和恢复的。
以上面的delay函数为例,delay函数是一个suspend函数,因此编译器能够确信,在delay函数被调用时,当前的代码正执行在一个续体的resumeWith方法中,并且由于编译器会为每一处suspend函数调用生成一个挂起点,因此当前执行流程是可以被挂起并被后续恢复的。在这里要实现挂起并恢复,只要能够获取当前这个续体对象,将这个续体对象暂存下来,利用平台提供的定时回调能力在1s之后调用它的resumeWith方法,这样,控制权就回到了调用delay的那一行代码处。
kotlin提供的suspendCoroutineUninterceptedOrReturn(block: (Continuation) -> Any)函数,就是用于获取当前续体对象的,但这个函数初见不太好理解,因为它并不会直接返回续体对象,而是要求我们传入一个lambda,并将当前上下文中的续体对象作为参数调用这个lambda。这里要利用这个函数实现delay,只要在这个lambda中返回COROUTINE_SUSPEND,并确保在1s之后调用作为参数传入的续体对象的resumeWith方法即可,通过COROUTINE_SUSPEND这个特殊的返回值,kotlin知道当前执行流程被挂起了。此外,如果这里返回的是suspend函数的返回值类型,kotlin就会认为suspend函数不须挂起就已经执行完毕,会直接调用resumeWith恢复续体的执行,这也意味着如果在lambda中没有返回COROUTINE_SUSPEND,一定要确保自己不会在lambda中调用续体的resumeWith方法。
续体拦截器决定协程在哪个线程执行
前面对续体做了比较多的讨论,我们知道了只要执行续体的resumeWith方法,就可以恢复续体的执行,可是如果我们想控制续体具体在哪个线程执行应该怎么做?比如我们自己要实现一个delay函数,创建了一个新的线程sleep了1s,然后调用了续体的resumeWith方法,这是否意味着协程会在这个新创建的线程中执行?
要回答这个问题,必须要先了解协程上下文的概念,回头看看之前Continuation接口的定义,可以看到每个续体都必须能过获取一个CoroutineContext对象,这就是协程上下文。协程上下文中保存了一组用户定义的对象,这里面一个非常重要的对象是续体拦截器(ContinuationInterceptor),这个接口定义了interceptContinuation方法,接受一个续体对象,返回包装后的续体对象。返回的包装对象可以重写resumeWith方法,自由决定如何恢复执行被包装的续体:在特定的线程内恢复协程的执行,或者将协程的恢复放到主线程消息队列中去,让协程后续在主线程中恢复执行。
在协程上下文中指定了续体拦截器的情况下,使用协程支持库提供的方法启动协程时,就会在拿到suspend函数对应的续体对象后使用续体拦截器对续体进行包装,后续调用resumeWith方法时调用的是包装对象的方法。在没有指定续体拦截器,或者特意不去使用续体拦截器对续体进行包装时,则不存在续体包装对象,续体会在resumeWith方法被调用的线程上直接恢复执行。
生成器和限定挂起
kotlin利用协程支持了生成器,generator的概念。
使用生成器,可以编写同步执行的,在不同上下文间来回转移执行流程的代码。
// 推断出类型为 Sequence<Int>
val fibonacci = sequence {
yield(1) // 斐波那契数列的首项
var cur = 1
var next = 1
while (true) {
yield(next) // 斐波那契数列的下一项
val tmp = cur + next
cur = next
next = tmp
}
}
for (num in fibonacci) {
println(num)
if (num > 1000) {
break
}
}
上面这个例子中,fibonacci其实包装了一个由suspend lambda续体。
每次使用hasNext检查下一个元素时,会调用续体的resume方法,在生成器场景下使用的是一个空的协程上下文对象,没有续体拦截器,因此这里会直接在当前线程执行续体,如果在续体中调用了yield,则说明存在后续的元素,hasNext因此返回true,同时记录下这次yield传入的值,下次调用next时返回,如果没有调用yield,则说明没有更多元素了,hasNext会返回false。
yield方法是一个suspend方法,并且是在sequence中唯一能够合法调用的suspend方法(这也是限定挂起中限定这个词的含义),这个方法的实现利用suspendCoroutineUninterceptedOrReturn捕获当前续体,记录下来,并返回COROUTINE_SUSPEND,导致这次resume方法返回,hasNext将在resume返回后检查由yield设置的值并返回true。下次hasNext调用时,会继续执行上次yield记录下来的续体。
异步流:flow
上面的sequence例子中,所有代码必须在当前线程内同步地执行,因此要确保所调用的suspend方法是一个不会导致挂起的suspend方法,上面其实只用到了协程续体保存状态和恢复执行的能力,并没有引入任何异步操作。
flow是比sequence更有意思的流:它是异步的。flow在很多地方都和sequence类似,和sequence不同的点在于,flow内使用emit生产数据,并且在多次emit中间可以挂起,恢复之后再emit下一个数据。sequence内由于被限制了可以使用的suspend函数,因此一定不会发生挂起,是一个同步流;flow则没有这样的限制,因此可以是同步的,也可以是异步的。
flow的emit方法是suspend的,suspend意味着会生成续体,执行流程会被转移到suspend函数中,一般必须要等待消费者处理完emit产生的数据,续体的resumeWith才会被调用,flow才会继续执行,如果消费端同步阻塞了,则续体也会被同步阻塞。
协程支持库对协程的封装
通过上面的讨论,已经大体明白了协程是如何执行的。当我们使用协程时,其实不需要像上面讨论的那样自行把suspend函数转换成续体,也不需要编写续体拦截器,kotlin的协程支持库已经考虑了几乎所有协程的使用场景,提供了简单直接的方法。
协程支持库其实提供了相当多的功能,基于上面讨论的基础原理,将每个协程封装成Job,为每个协程绑定了CoroutineScope,允许协程间建立父子关系并定义父子协程如何协作,等等。
使用启动器启动协程
可以使用GlobalScope.launch方法,或者全局函数runBlocking来启动协程。前者在一个线程池中调度协程,后者在当前线程启动一个消息队列来调度协程,并在协程执行完成前阻塞当前线程。
在使用launch方式启动协程时,可以指定启动模式,目前已经稳定可用的启动模式是Default和Lazy,二者都会给协程上下文设置默认的续体拦截器,在默认线程池中调度线程,以Lazy模式启动的协程不会立即开始执行,而是直到必要时(被join或者被await时)才会开始执行。
协程的Job管理
kotlin的协程支持库为协程添加了状态管理的能力,规定协程启动器在启动协程后必须返回一个代表协程的Job或者Defered对象,其中Defered继承Job接口,给Job增加了获得协程结果的能力,这个对象有如下这几种状态:
- New 可选初始状态,以LAZY模式启动的协程初始是这个状态,被启动后才会变成Active状态
- Active 正常运行
- Completing 当前协程已经结束,正在等待子协程
- Cancelling 当前协程已经取消,正在等待子协程
- Cancelled 已取消
- Completed 已完成
协程之间存在父子关系,利用协程支持库提供的API在一个协程上下文中启动的新的协程自动的成为当前协程的子协程,父协程必须在所有子协程结束后才能结束。取消父协程会导致子协程被同时取消,但取消子协程不会导致父协程被取消;子协程抛出异常会导致异常被传播到父协程,同时导致兄弟协程被取消。
确保业务逻辑兼容取消
在suspend函数中,可以主动使用isActive来检查当前的协程是否已经被取消来及时放弃不必要的工作。kotlin还规定,在suspend函数中调用的每一个其他suspend函数都隐式的支持取消,在当前协程上下文没有指定NonCancellable这个特殊的Context的情况下,续体在恢复执行之前会检查当前协程是否已经被取消,若被取消则直接return不恢复执行续体。因此,业务的协程代码一定要留心,不要写出如果suspend之后的逻辑不执行就会导致问题的代码。
协程的CoroutineScope
每个协程lambda在执行时,都可以执行特定的协程方法,这些方法都是由CoroutineScope定义的,之所以能够在协程中调用这些方法,是因为协程启动器是这样定义协程lambda的:block: suspend CoroutineScope.() -> T
这样的一个lambda必须由一个CoroutineScope调用,调用的语法也非常正常:scope.block(),这个CoroutineScope对象会成为lambda的上下文对象,可以直接调用这个对象上定义的方法。这种lambda的定义方式,使得在kotlin中能够使用简洁的dsl写法。
和其他语言的对比
JS的协程
JS中的generator实现了协程,其实现和Kotlin一样,也是基于CPS变换的,每一个yield都会被编译为一个挂起点,但在实际的JS业务开发中,很少会直接使用generator,而是会使用async/await,async/await应该被视作是的generator的语法糖。
- 每一个await函数调用都会被当作一个挂起点,生成的续体对象恢复执行时可以在这些挂起点处继续执行;
- async函数的返回值是Promise类型的对象,async函数体会被封装到Promise中,当async函数执行到了最后,需要返回时,会resolve返回值;
- await调用时,会持有当前函数对应的续体对象,启动由async函数返回的Promise对象,然后直接return等待Promise resolve,然后恢复续体对象的执行;
JS中不可以开启新的线程,也就不存在协程调度的概念。JS中的异步API调用时,具体的异步逻辑会在哪个线程执行,是由浏览器或者JS Engine决定的,因此JS是一个严格的单线程消息队列模型,不存在协程调度的概念。
JS中,虽然async函数返回的是Promise对象,但cancel掉一个async函数返回的Promise对象后,其内部的所有异步逻辑还是会继续执行 —— 它们压根不知道有外部Promise对象的存在,也不关心外部Promise是否已经cancel了。当外层Promise最终resolve时,由于它已经被cancel,结果会被直接丢弃。在使用JS的协程时,必须自行处理cancel逻辑。
Goroutine
Go语言提供的Goroutine概念常常被和协程一块提及,并且经常被弄混淆。
Goroutine的核心目标是提供高效的并发编程范式,而不是提供以同步写法编写异步逻辑的语法。因此,Goroutine没有返回值,并且启动Goroutine也不会导致当前函数挂起。
Go定义了GMP模型,使用go关键字能够将一个函数封装为一个Goroutine并发布到全局Goroutine队列中,Go的调度器负责执行所有的Goroutine,Go的调度算法确保Go最多只会使用和机器核心数量相等的线程执行Goroutine,如果一个线程由于调用了文件IO接口阻塞,调度器会开启一个新线程,后续被阻塞的线程恢复执行后,如果线程数量超出了CPU核心数量,Go又会停用超出数量的线程。Go的这种调度算法能够最大化并发编程的效率,同时给开发者提供简洁的同步编程接口。
由于Kotlin的调度算法完全是在语言之外实现的,因此理论上Kotlin也能够参考Go实现类似的调度算法。此时,Kotlin的协程就具备了Goroutine的优势,同时还能够使用协程语法和Job管理来简化业务逻辑。这么看,Kotlin的协程设计虽然比JS的协程和Go的Goroutine都更加复杂,但在这复杂性的背后,的确提供了更强大的能力。