CPS 变换

在Kotlin协程中编译器做了很多操作。编译的时候发生 CPS 变换。比如为挂起函数添加 Continuation 参数,返回值类型变成了 Any?类型。这是因为除了要返回本身的返回值,还要返回一个标记 COROUTINE_SUSPENDED,这个标记表示挂起函数正在挂起的状态。

续体和续体拦截器

续体就是包装了挂起函数之后应该执行的代码;在编译过程中一个完整的协程被分割成为一个又一个续体。在挂起函数结束后,会通过调用ContinuationresumeWith 函数来恢复执行续体代码。

续体拦截器:调度器Dispatchers.Main Dispatchers.IO Dispatchers.Default 都实现了 ContinuationInterceptor 。续体拦截器负责拦截恢复协程在恢复后应该执行的代码(即续体)并将其在指定线程或线程池恢复。

状态机

协程在编译挂起函数的时候会将挂起函数编译成状态机,避免创建过多的类和对象。

  1. val a = a()
  2. val y = foo(a).await() // 挂起点 #1
  3. b()
  4. val z = bar(a, y).await() // 挂起点 #2
  5. c(z)

下面是编译过后的伪代码

  1. // 状态机当前状态
  2. int label = 0
  3. // 协程的局部变量
  4. A a = null
  5. Y y = null
  6. void resumeWith(Object result) {
  7. if (label == 0) goto L0
  8. if (label == 1) goto L1
  9. if (label == 2) goto L2
  10. else throw IllegalStateException()
  11. L0:
  12. // 这次调用,result 应该为空
  13. a = a()
  14. label = 1
  15. result = foo(a).await(this) // 'this' 作为续体传递
  16. if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
  17. L1:
  18. // 外部代码传入 .await() 的结果恢复协程
  19. y = (Y) result
  20. b()
  21. label = 2
  22. result = bar(a, y).await(this) // 'this' 作为续体传递
  23. if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
  24. L2:
  25. // 外部代码传入 .await() 的结果恢复协程
  26. Z z = (Z) result
  27. c(z)
  28. label = -1 // 没有其他步骤了
  29. return
  30. }

Dispatchers.Main

Dispatchers.Main

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

internal object MainDispatcherLoader {
    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
}

MainCoroutineDispatcher 是一个抽象类。找到它的实现类 HandlerDispatcherHandlerDispatcher是一个密封类,知道它的唯一实现类 HandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    ....
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    ....
}

找到 HandlerContext 构造函数调用的地方

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
              //这里创建了 HandlerContext 对象
        HandlerContext(Looper.getMainLooper().asHandler(async = true))

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}

通过以上代码发现,将续体代码所在协程调度到主线程其实还是使用了 Handler,这个Handler 使用的是 mainLooper。

Dispatchers.Defalut、Dispatchers.IO

//Default
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
      //IO 调度器
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )

    override fun close() {
        throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
    }

    override fun toString(): String = DEFAULT_DISPATCHER_NAME

    @InternalCoroutinesApi
    @Suppress("UNUSED")
    public fun toDebugString(): String = super.toString()
}

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,//线程池核心数量。
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    override val executor: Executor
        get() = coroutineScheduler

    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
              //分发
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatch(context, block)
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
              //分发
            coroutineScheduler.dispatch(block, tailDispatch = true)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatchYield(context, block)
        }
    。。。
  internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
              //分发
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            // TaskContext shouldn't be lost here to properly invoke before/after task
            DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
        }
    }
  。。。
  private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
  。。。
}

@JvmField
internal val CORE_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.core.pool.size",
    AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
    minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.max.pool.size",
    (AVAILABLE_PROCESSORS * 128).coerceIn(
        CORE_POOL_SIZE,
        CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE  //MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
    ),
    maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
//通过 JVM 取得当前处理器可运行的线程数
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()

可以看出是通过coroutineScheduler.dispatch() 进行调度。

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
  。。。
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
            //将传入的任务压栈
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
  。。。
}

以上代码可以看出其实就是实现了 Executor 接口,自己实现了一个线程池。

其中 corePoolSize 是 通过 JVM 的Runtime.getRuntime().availableProcessors() 函数取得当前处理器可运行的线程数,最小是 2。

maxPoolSize 的最小值设定为 corePoolSize。最大值则设定为 (1 shl BLOCKING_SHIFT) - 2,即 1 向左位移 21 位再减 2,此外还要计算一个值 s:Runtime.getRuntime().availableProcessors() 乘以 128。如果 s 介于最小值与最大值之间,则 maxPoolSize 的值为 s,小于最小值等于最小值,大于最大值则等于最大值。

由此可见 DefaultScheduler 这种情况主要用来处理密集型运算,其核心线程数与处理器的线程数相等,这与 RxJava 的计算线程池的思想是类似的。

Default 调度器还有一种情况是 CommonPoolCommonPoolDefaultScheduler 的设计是类似的,不过它的线程池没有那么麻烦,它直接使用了 Java 的 Executors.newFixedThreadPool API,这种线程池只有核心线程,核心线程没有超时机制也不会被回收,任务队列没有大小限制,代码就不仔细去看了,总之思想是类似的。

IO 调度器其实就是 DefaultScheduler 中的一个成员属性。

val IO: CoroutineDispatcher = LimitingDispatcher(
    this,
    systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
    "Dispatchers.IO",
    TASK_PROBABLY_BLOCKING
)
private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)

    override val executor: Executor
        get() = this

    override fun execute(command: Runnable) = dispatch(command, false)

    override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Commit in-flight tasks slot
            val inFlight = inFlightTasks.incrementAndGet()

            // Fast path, if parallelism limit is not reached, dispatch task and return
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            queue.add(taskToSchedule)
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }

            taskToSchedule = queue.poll() ?: return
        }
    }

    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        dispatch(block, tailDispatch = true)
    }

    。。。
}

通过dispatch 方法可以看出是通过 成员属性 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) 进行调度的。这个 dispatcher 就是 ExperimentalCoroutineDispatcher。所以 IO 调度器适合 Default 调度器共用的同一个线程池。

参考文章

Kotlin 协程实现原理