对于suspend修饰的函数,编译器会帮我们在该函数中传入一个Continuation参数,使用Continuation参数代替了suspend修饰符。

  1. public interface Continuation<in T> {
  2. // 与该续体对应的协程的上下文
  3. public val context: CoroutineContext
  4. // 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
  5. public fun resumeWith(result: Result<T>)
  6. }

在协程中使用Continuation接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码。
(比如:进行网络请求的时候,使用Callback接收网络请求的结果,这时候可以将Callback看作一个续体,即网络请求的续体,用于接收网络请求的结果)

通过传递Continuation来控制异步调用流程被称作CPS变换(Continuation-Passing-Style Transformation)。

  1. //正常的代码
  2. class Temp {
  3. suspend fun fetchData(argument: String): Boolean {
  4. val result = netRequest(argument)
  5. return result == 0
  6. }
  7. // 模拟网络请求
  8. suspend fun netRequest(argument: String): Int {
  9. delay(1000)
  10. return argument.length
  11. }
  12. }

fetchData函数编译时生成了一个静态内部类

  1. //反编译后的简略版代码
  2. static final class FetchDataStateMachine extends ContinuationImpl {
  3. Object result;
  4. int label;
  5. FetchDataStateMachine(Continuation $completion) {
  6. super($completion); //接收一个名称为$completion的Continuation参数,$completion被保存在父类BaseContinuationImpl中:
  7. }
  8. @Nullable
  9. public final Object invokeSuspend(@NotNull Object $result) {
  10. this.result = $result;
  11. this.label |= Integer.MIN_VALUE;
  12. return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
  13. }
  14. }
  1. //继承关系
  2. FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
  3. //
  4. 所以,也可以说:fetchData函数编译时生成了续体。
  1. /*
  2. BaseContinuationImpl主要做了下面的几件事情:
  3. 保存completion:它保存了fetchData方法的FetchDataStateMachine实例,使得可以一级一级地向上回调续体。
  4. (重写resumeWith方法:BaseContinuationImpl重写了Continuation接口的resumeWith方法,该方法用于恢复协程,也是协程恢复的核心逻辑)。
  5. */
  6. internal abstract class BaseContinuationImpl(
  7. public val completion: Continuation<Any?>?
  8. ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
  9. public final override fun resumeWith(result: Result<Any?>) {
  10. var current = this
  11. var param = result
  12. while (true) {
  13. probeCoroutineResumed(current)
  14. with(current) {
  15. val completion = completion!!
  16. val outcome: Result<Any?> =
  17. try {
  18. val outcome = invokeSuspend(param)
  19. if (outcome === COROUTINE_SUSPENDED) return
  20. Result.success(outcome)
  21. } catch (exception: Throwable) {
  22. Result.failure(exception)
  23. }
  24. releaseIntercepted()
  25. if (completion is BaseContinuationImpl) {
  26. current = completion
  27. param = outcome
  28. } else {
  29. completion.resumeWith(outcome)
  30. return
  31. }
  32. }
  33. }
  34. }

状态机FetchDataStateMachine声明了result和label两个变量

  • result:表示上一个Continuation的结果,比如有函数A和B,函数内部分别声明了ContinuationA和ContinuationB,A调用B并且将ContinuationA传入B中保存。在后续回调的过程中,ContinuationA可以从result变量中拿到ContinuationB::invokeSuspend的执行结果

  • label:Kotlin Compiler可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point)被表示为状态机的一个状态(state),这些状态通过switch case语句表示出来。label表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case,通过label变量就记录下了状态机当前的状态

  1. //反编译后的代码
  2. public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
  3. Object $continuation;
  4. label25:
  5. {
  6. /*
  7. 判断传入的completion是否为FetchDataStateMachine类型,
  8. 若是则对它的label变量做些操作,
  9. 若不是则直接创建一个FetchDataStateMachine并且传入completion(completion会被保存下来)。
  10. */
  11. if (completion instanceof FetchDataStateMachine) {
  12. $continuation = (FetchDataStateMachine) completion;
  13. if (($continuation.label & Integer.MIN_VALUE) != 0) {
  14. $continuation.label -= Integer.MIN_VALUE;
  15. break label25;
  16. }
  17. }
  18. $continuation = new FetchDataStateMachine(completion);
  19. }
  20. /*
  21. fetchData方法原先的代码语句会被划分为switch下的多个case语句
  22. label变量控制当前要执行哪个case分支。
  23. */
  24. Object $result = $continuation.result;
  25. Object resultTemp;
  26. switch ($continuation.label) {
  27. case 0:
  28. ResultKt.throwOnFailure($result);
  29. $continuation.label = 1;
  30. resultTemp = this.netRequest(argument, (Continuation) $continuation);
  31. if (resultTemp == COROUTINE_SUSPENDED) {
  32. return COROUTINE_SUSPENDED;
  33. }
  34. break;
  35. case 1:
  36. ResultKt.throwOnFailure($result);
  37. resultTemp = $result;
  38. break;
  39. default:
  40. throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  41. }
  42. int result = ((Number) resultTemp).intValue();
  43. return Boxing.boxBoolean(result == 0);
  44. }
  45. static final class FetchDataStateMachine extends ContinuationImpl {
  46. Object result;
  47. int label;
  48. FetchDataStateMachine(Continuation $completion) {
  49. super($completion);
  50. }
  51. @Nullable
  52. public final Object invokeSuspend(@NotNull Object $result) {
  53. this.result = $result;
  54. this.label |= Integer.MIN_VALUE;
  55. return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
  56. }
  57. }

有限状态机(FSM,即 Finite-State Machine)——来控制协程代码的执行

  1. // 模拟网络请求
  2. suspend fun netRequest(argument: String): Int {
  3. delay(1000)
  4. return argument.length
  5. }
  6. //反编译后
  7. public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
  8. if (timeMillis <= 0L) {
  9. return Unit.INSTANCE;
  10. } else {
  11. // 实现类
  12. CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
  13. cancellableContinuationImpl.initCancellability();
  14. // 向上转型
  15. CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
  16. if (timeMillis < Long.MAX_VALUE) {
  17. // 延时操作
  18. getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
  19. }
  20. // 获取执行结果
  21. Object result = cancellableContinuationImpl.getResult();
  22. if (result == COROUTINE_SUSPENDED) {
  23. DebugProbesKt.probeCoroutineSuspended($completion);
  24. }
  25. // 返回结果
  26. return result;
  27. }
  28. }

当delay方法需要挂起的时候,它返回COROUTINE_SUSPENDED,接着netRequest方法返回COROUTINE_SUSPENDED,接着fetchData方法返回COROUTINE_SUSPENDED,重复这个过程直到调用栈的最上层。
通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,也是「非阻塞式挂起」

恢复

  1. // 模拟网络请求
  2. suspend fun netRequest(argument: String): Int {
  3. delay(1000)
  4. return argument.length
  5. }
  6. // 反编译后
  7. public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
  8. Object $continuation;
  9. label20:
  10. {
  11. if (completion instanceof NetRequestStateMachine) {
  12. $continuation = (NetRequestStateMachine) completion;
  13. if (($continuation.label & Integer.MIN_VALUE) != 0) {
  14. $continuation.label -= Integer.MIN_VALUE;
  15. break label20;
  16. }
  17. }
  18. $continuation = new NetRequestStateMachine(completion);
  19. }
  20. Object $result = $continuation.result;
  21. switch ($continuation.label) {
  22. case 0:
  23. ResultKt.throwOnFailure($result);
  24. $continuation.functionParameter = argument;
  25. $continuation.label = 1;
  26. if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
  27. return COROUTINE_SUSPENDED;
  28. }
  29. break;
  30. case 1:
  31. argument = (String) ($continuation.functionParameter);
  32. ResultKt.throwOnFailure($result);
  33. break;
  34. default:
  35. throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  36. }
  37. return Boxing.boxInt(argument.length());
  38. }
  39. static final class NetRequestStateMachine extends ContinuationImpl {
  40. Object result;
  41. int label;
  42. Object functionParameter;
  43. NetRequestStateMachine(Continuation $completion) {
  44. super($completion);
  45. }
  46. @Nullable
  47. public final Object invokeSuspend(@NotNull Object $result) { // invokeSuspend
  48. this.result = $result;
  49. this.label |= Integer.MIN_VALUE;
  50. return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
  51. }
  52. }

resumeWith方法 和 恢复 相关

  1. internal abstract class BaseContinuationImpl(
  2. public val completion: Continuation<Any?>?
  3. ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
  4. // This implementation is final. This fact is used to unroll resumeWith recursion.
  5. public final override fun resumeWith(result: Result<Any?>) {
  6. // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
  7. var current = this
  8. var param = result
  9. while (true) {
  10. // 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分
  11. // 已经恢复了。
  12. probeCoroutineResumed(current)
  13. with(current) {
  14. val completion = completion!! // fail fast when trying to resume continuation without completion
  15. val outcome: Result<Any?> =
  16. try {
  17. val outcome = invokeSuspend(param)
  18. if (outcome === COROUTINE_SUSPENDED) return
  19. Result.success(outcome)
  20. } catch (exception: Throwable) {
  21. Result.failure(exception)
  22. }
  23. releaseIntercepted() // this state machine instance is terminating
  24. if (completion is BaseContinuationImpl) { // 符合
  25. //把current更新为FetchDataStateMachine实例
  26. current = completion
  27. // 把param更新为outcome(包装了netRequest-Return的Result)
  28. param = outcome
  29. } else {
  30. // top-level completion reached -- invoke and return
  31. completion.resumeWith(outcome)
  32. return
  33. }
  34. }
  35. }
  36. }
  37. protected abstract fun invokeSuspend(result: Result<Any?>): Any?
  38. protected open fun releaseIntercepted() {
  39. // does nothing here, overridden in ContinuationImpl
  40. }
  41. ...
  42. }
  1. static final class NetRequestStateMachine extends ContinuationImpl {
  2. .......
  3. @Nullable
  4. public final Object invokeSuspend(@NotNull Object $result) {
  5. this.result = $result;
  6. this.label |= Integer.MIN_VALUE;
  7. /*
  8. 调用了netRequest方法,并且将续体自身作为参数传入。
  9. */
  10. return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
  11. /*
  12. 该返回值会传递到BaseContinuationImpl的resumeWith方法中
  13. */
  14. }
  15. }
  1. NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
  1. BaseContinuationImpl(
  2. resumeWith(result: Result<Any?>)
  3. val outcome: Result<Any?> =
  4. try {
  5. val outcome = invokeSuspend(param)
  6. if (outcome === COROUTINE_SUSPENDED) return
  7. Result.success(outcome)
  8. } catch (exception: Throwable) {
  9. Result.failure(exception)
  10. }
  11. resumeWith方法中,将netRequest-Return包装为Result保存到outcome变量中。

【续体的调用过程,其实就是层层往上地调用续体的invokeSuspend方法】
继续进行下一轮while循环,在with块中会执行FetchDataStateMachine::invokeSuspend

要知道怎么恢复——就要知道调用fetchData方法的时候会做些什么

  1. // 挂起函数
  2. suspend fun requestToken(): Token { ... }
  3. //launch方法可以启动一个协程
  4. fun postItem(item: Item) {
  5. GlobalScope.launch {
  6. val token = requestToken() //
  7. val post = createPost(token, item)
  8. processPost(post)
  9. }
  10. }
  11. //源码---------------------------------
  12. public fun CoroutineScope.launch(
  13. context: CoroutineContext = EmptyCoroutineContext,
  14. start: CoroutineStart = CoroutineStart.DEFAULT,
  15. block: suspend CoroutineScope.() -> Unit
  16. ): Job {
  17. ...
  18. coroutine.start(start, coroutine, block)
  19. return coroutine
  20. }

coroutine.start(start, coroutine, block)
|
走到|

startCoroutineCancellable

  1. internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
  2. createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
  3. public fun <T> Continuation<T>.resumeCancellableWith(
  4. result: Result<T>,
  5. onCancellation: ((cause: Throwable) -> Unit)? = null
  6. ): Unit = when (this) { // 判断
  7. is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
  8. else -> resumeWith(result) // 传递
  9. }
  10. /*
  11. 首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),
  12. 如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,
  13. 否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():
  14. */
  1. //包装后的续体类型,维护了调度器dispatcher和原续体对象continuation
  2. internal class DispatchedContinuation<in T>(
  3. @JvmField val dispatcher: CoroutineDispatcher,
  4. @JvmField val continuation: Continuation<T>
  5. ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  6. //DispatchedContinuation类的resumeCancellableWith()
  7. inline fun resumeCancellableWith(
  8. result: Result<T>,
  9. noinline onCancellation: ((cause: Throwable) -> Unit)?
  10. ) {
  11. val state = result.toState(onCancellation)
  12. //通过调度器判断是否需要切换线程
  13. if (dispatcher.isDispatchNeeded(context)) {
  14. _state = state
  15. resumeMode = MODE_CANCELLABLE
  16. //A 如果需要,则切换线程,将当前续体this作为Runnable传入
  17. dispatcher.dispatch(context, this)
  18. } else {
  19. executeUnconfined(state, MODE_CANCELLABLE) {
  20. if (!resumeCancelled(state)) {
  21. //B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行
  22. resumeUndispatchedWith(result)
  23. }
  24. }
  25. }
  26. }
  27. //B 没有调度器拦截的情况直接在当前线程执行
  28. inline fun resumeUndispatchedWith(result: Result<T>) {
  29. withCoroutineContext(context, countOrElement) {
  30. continuation.resumeWith(result)
  31. }
  32. }
  33. }

协程恢复的实质是对续体进行回调

kotlin 协程原理 - 图1