对于suspend修饰的函数,编译器会帮我们在该函数中传入一个Continuation参数,使用Continuation参数代替了suspend修饰符。
public interface Continuation<in T> {
// 与该续体对应的协程的上下文
public val context: CoroutineContext
// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
public fun resumeWith(result: Result<T>)
}
在协程中使用Continuation接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码。
(比如:进行网络请求的时候,使用Callback接收网络请求的结果,这时候可以将Callback看作一个续体,即网络请求的续体,用于接收网络请求的结果)
通过传递Continuation来控制异步调用流程被称作CPS变换(Continuation-Passing-Style Transformation)。
//正常的代码
class Temp {
suspend fun fetchData(argument: String): Boolean {
val result = netRequest(argument)
return result == 0
}
// 模拟网络请求
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument.length
}
}
fetchData函数编译时生成了一个静态内部类
//反编译后的简略版代码
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion); //接收一个名称为$completion的Continuation参数,$completion被保存在父类BaseContinuationImpl中:
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
}
}
//继承关系
FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
//
所以,也可以说:fetchData函数编译时生成了续体。
/*
BaseContinuationImpl主要做了下面的几件事情:
保存completion:它保存了fetchData方法的FetchDataStateMachine实例,使得可以一级一级地向上回调续体。
(重写resumeWith方法:BaseContinuationImpl重写了Continuation接口的resumeWith方法,该方法用于恢复协程,也是协程恢复的核心逻辑)。
*/
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
状态机FetchDataStateMachine声明了result和label两个变量
result:表示上一个Continuation的结果,比如有函数A和B,函数内部分别声明了ContinuationA和ContinuationB,A调用B并且将ContinuationA传入B中保存。在后续回调的过程中,ContinuationA可以从result变量中拿到ContinuationB::invokeSuspend的执行结果。
label:Kotlin Compiler可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point)被表示为状态机的一个状态(state),这些状态通过switch case语句表示出来。label表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case,通过label变量就记录下了状态机当前的状态。
//反编译后的代码
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
/*
判断传入的completion是否为FetchDataStateMachine类型,
若是则对它的label变量做些操作,
若不是则直接创建一个FetchDataStateMachine并且传入completion(completion会被保存下来)。
*/
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
/*
fetchData方法原先的代码语句会被划分为switch下的多个case语句
label变量控制当前要执行哪个case分支。
*/
Object $result = $continuation.result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing.boxBoolean(result == 0);
}
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
}
}
有限状态机(FSM,即 Finite-State Machine)——来控制协程代码的执行。
// 模拟网络请求
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument.length
}
//反编译后
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
// 实现类
CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
cancellableContinuationImpl.initCancellability();
// 向上转型
CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
if (timeMillis < Long.MAX_VALUE) {
// 延时操作
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
}
// 获取执行结果
Object result = cancellableContinuationImpl.getResult();
if (result == COROUTINE_SUSPENDED) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
// 返回结果
return result;
}
}
当delay方法需要挂起的时候,它返回COROUTINE_SUSPENDED,接着netRequest方法返回COROUTINE_SUSPENDED,接着fetchData方法返回COROUTINE_SUSPENDED,重复这个过程直到调用栈的最上层。
通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,也是「非阻塞式挂起」。
恢复
// 模拟网络请求
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument.length
}
// 反编译后
public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label20:
{
if (completion instanceof NetRequestStateMachine) {
$continuation = (NetRequestStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new NetRequestStateMachine(completion);
}
Object $result = $continuation.result;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.functionParameter = argument;
$continuation.label = 1;
if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
argument = (String) ($continuation.functionParameter);
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing.boxInt(argument.length());
}
static final class NetRequestStateMachine extends ContinuationImpl {
Object result;
int label;
Object functionParameter;
NetRequestStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) { // invokeSuspend
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
}
}
resumeWith方法 和 恢复 相关
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分
// 已经恢复了。
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) { // 符合
//把current更新为FetchDataStateMachine实例
current = completion
// 把param更新为outcome(包装了netRequest-Return的Result)
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
...
}
static final class NetRequestStateMachine extends ContinuationImpl {
.......
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
/*
调用了netRequest方法,并且将续体自身作为参数传入。
*/
return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
/*
该返回值会传递到BaseContinuationImpl的resumeWith方法中
*/
}
}
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl(
resumeWith(result: Result<Any?>)
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
在resumeWith方法中,将netRequest-Return包装为Result保存到outcome变量中。
【续体的调用过程,其实就是层层往上地调用续体的invokeSuspend方法】
继续进行下一轮while循环,在with块中会执行FetchDataStateMachine::invokeSuspend
要知道怎么恢复——就要知道调用fetchData方法的时候会做些什么。
// 挂起函数
suspend fun requestToken(): Token { ... }
//launch方法可以启动一个协程
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken() //
val post = createPost(token, item)
processPost(post)
}
}
//源码---------------------------------
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
coroutine.start(start, coroutine, block)
return coroutine
}
coroutine.start(start, coroutine, block)
|
走到|
|
startCoroutineCancellable
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) { // 判断
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result) // 传递
}
/*
首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),
如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,
否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():
*/
//包装后的续体类型,维护了调度器dispatcher和原续体对象continuation
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
//DispatchedContinuation类的resumeCancellableWith()
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//通过调度器判断是否需要切换线程
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//A 如果需要,则切换线程,将当前续体this作为Runnable传入
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行
resumeUndispatchedWith(result)
}
}
}
}
//B 没有调度器拦截的情况直接在当前线程执行
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}
}
协程恢复的实质是对续体进行回调。