对于suspend修饰的函数,编译器会帮我们在该函数中传入一个Continuation参数,使用Continuation参数代替了suspend修饰符。
public interface Continuation<in T> {// 与该续体对应的协程的上下文public val context: CoroutineContext// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值public fun resumeWith(result: Result<T>)}
在协程中使用Continuation接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码。
(比如:进行网络请求的时候,使用Callback接收网络请求的结果,这时候可以将Callback看作一个续体,即网络请求的续体,用于接收网络请求的结果)
通过传递Continuation来控制异步调用流程被称作CPS变换(Continuation-Passing-Style Transformation)。
//正常的代码class Temp {suspend fun fetchData(argument: String): Boolean {val result = netRequest(argument)return result == 0}// 模拟网络请求suspend fun netRequest(argument: String): Int {delay(1000)return argument.length}}
fetchData函数编译时生成了一个静态内部类
//反编译后的简略版代码static final class FetchDataStateMachine extends ContinuationImpl {Object result;int label;FetchDataStateMachine(Continuation $completion) {super($completion); //接收一个名称为$completion的Continuation参数,$completion被保存在父类BaseContinuationImpl中:}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);}}
//继承关系FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation//所以,也可以说:fetchData函数编译时生成了续体。
/*BaseContinuationImpl主要做了下面的几件事情:保存completion:它保存了fetchData方法的FetchDataStateMachine实例,使得可以一级一级地向上回调续体。(重写resumeWith方法:BaseContinuationImpl重写了Continuation接口的resumeWith方法,该方法用于恢复协程,也是协程恢复的核心逻辑)。*/internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {var current = thisvar param = resultwhile (true) {probeCoroutineResumed(current)with(current) {val completion = completion!!val outcome: Result<Any?> =try {val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted()if (completion is BaseContinuationImpl) {current = completionparam = outcome} else {completion.resumeWith(outcome)return}}}}
状态机FetchDataStateMachine声明了result和label两个变量
result:表示上一个Continuation的结果,比如有函数A和B,函数内部分别声明了ContinuationA和ContinuationB,A调用B并且将ContinuationA传入B中保存。在后续回调的过程中,ContinuationA可以从result变量中拿到ContinuationB::invokeSuspend的执行结果。
label:Kotlin Compiler可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point)被表示为状态机的一个状态(state),这些状态通过switch case语句表示出来。label表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case,通过label变量就记录下了状态机当前的状态。
//反编译后的代码public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;label25:{/*判断传入的completion是否为FetchDataStateMachine类型,若是则对它的label变量做些操作,若不是则直接创建一个FetchDataStateMachine并且传入completion(completion会被保存下来)。*/if (completion instanceof FetchDataStateMachine) {$continuation = (FetchDataStateMachine) completion;if (($continuation.label & Integer.MIN_VALUE) != 0) {$continuation.label -= Integer.MIN_VALUE;break label25;}}$continuation = new FetchDataStateMachine(completion);}/*fetchData方法原先的代码语句会被划分为switch下的多个case语句label变量控制当前要执行哪个case分支。*/Object $result = $continuation.result;Object resultTemp;switch ($continuation.label) {case 0:ResultKt.throwOnFailure($result);$continuation.label = 1;resultTemp = this.netRequest(argument, (Continuation) $continuation);if (resultTemp == COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED;}break;case 1:ResultKt.throwOnFailure($result);resultTemp = $result;break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}int result = ((Number) resultTemp).intValue();return Boxing.boxBoolean(result == 0);}static final class FetchDataStateMachine extends ContinuationImpl {Object result;int label;FetchDataStateMachine(Continuation $completion) {super($completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);}}
有限状态机(FSM,即 Finite-State Machine)——来控制协程代码的执行。
// 模拟网络请求suspend fun netRequest(argument: String): Int {delay(1000)return argument.length}//反编译后public static final Object delay(long timeMillis, @NotNull Continuation $completion) {if (timeMillis <= 0L) {return Unit.INSTANCE;} else {// 实现类CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);cancellableContinuationImpl.initCancellability();// 向上转型CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;if (timeMillis < Long.MAX_VALUE) {// 延时操作getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);}// 获取执行结果Object result = cancellableContinuationImpl.getResult();if (result == COROUTINE_SUSPENDED) {DebugProbesKt.probeCoroutineSuspended($completion);}// 返回结果return result;}}
当delay方法需要挂起的时候,它返回COROUTINE_SUSPENDED,接着netRequest方法返回COROUTINE_SUSPENDED,接着fetchData方法返回COROUTINE_SUSPENDED,重复这个过程直到调用栈的最上层。
通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,也是「非阻塞式挂起」。
恢复
// 模拟网络请求suspend fun netRequest(argument: String): Int {delay(1000)return argument.length}// 反编译后public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;label20:{if (completion instanceof NetRequestStateMachine) {$continuation = (NetRequestStateMachine) completion;if (($continuation.label & Integer.MIN_VALUE) != 0) {$continuation.label -= Integer.MIN_VALUE;break label20;}}$continuation = new NetRequestStateMachine(completion);}Object $result = $continuation.result;switch ($continuation.label) {case 0:ResultKt.throwOnFailure($result);$continuation.functionParameter = argument;$continuation.label = 1;if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED;}break;case 1:argument = (String) ($continuation.functionParameter);ResultKt.throwOnFailure($result);break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}return Boxing.boxInt(argument.length());}static final class NetRequestStateMachine extends ContinuationImpl {Object result;int label;Object functionParameter;NetRequestStateMachine(Continuation $completion) {super($completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) { // invokeSuspendthis.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.netRequest(null, (Continuation<? super Integer>) this);}}
resumeWith方法 和 恢复 相关
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {// This implementation is final. This fact is used to unroll resumeWith recursion.public final override fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resumevar current = thisvar param = resultwhile (true) {// 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分// 已经恢复了。probeCoroutineResumed(current)with(current) {val completion = completion!! // fail fast when trying to resume continuation without completionval outcome: Result<Any?> =try {val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) { // 符合//把current更新为FetchDataStateMachine实例current = completion// 把param更新为outcome(包装了netRequest-Return的Result)param = outcome} else {// top-level completion reached -- invoke and returncompletion.resumeWith(outcome)return}}}}protected abstract fun invokeSuspend(result: Result<Any?>): Any?protected open fun releaseIntercepted() {// does nothing here, overridden in ContinuationImpl}...}
static final class NetRequestStateMachine extends ContinuationImpl {.......@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;/*调用了netRequest方法,并且将续体自身作为参数传入。*/return Temp.this.netRequest(null, (Continuation<? super Integer>) this);/*该返回值会传递到BaseContinuationImpl的resumeWith方法中*/}}
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl(resumeWith(result: Result<Any?>)val outcome: Result<Any?> =try {val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}在resumeWith方法中,将netRequest-Return包装为Result保存到outcome变量中。
【续体的调用过程,其实就是层层往上地调用续体的invokeSuspend方法】
继续进行下一轮while循环,在with块中会执行FetchDataStateMachine::invokeSuspend
要知道怎么恢复——就要知道调用fetchData方法的时候会做些什么。
// 挂起函数suspend fun requestToken(): Token { ... }//launch方法可以启动一个协程fun postItem(item: Item) {GlobalScope.launch {val token = requestToken() //val post = createPost(token, item)processPost(post)}}//源码---------------------------------public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job {...coroutine.start(start, coroutine, block)return coroutine}
coroutine.start(start, coroutine, block)
|
走到|
|
startCoroutineCancellable
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>,onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) { // 判断is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)else -> resumeWith(result) // 传递}/*首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():*/
//包装后的续体类型,维护了调度器dispatcher和原续体对象continuationinternal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {//DispatchedContinuation类的resumeCancellableWith()inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?) {val state = result.toState(onCancellation)//通过调度器判断是否需要切换线程if (dispatcher.isDispatchNeeded(context)) {_state = stateresumeMode = MODE_CANCELLABLE//A 如果需要,则切换线程,将当前续体this作为Runnable传入dispatcher.dispatch(context, this)} else {executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {//B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行resumeUndispatchedWith(result)}}}}//B 没有调度器拦截的情况直接在当前线程执行inline fun resumeUndispatchedWith(result: Result<T>) {withCoroutineContext(context, countOrElement) {continuation.resumeWith(result)}}}
协程恢复的实质是对续体进行回调。

