CPS 变换
在Kotlin协程中编译器做了很多操作。编译的时候发生 CPS 变换。比如为挂起函数添加 Continuation
参数,返回值类型变成了 Any?类型。这是因为除了要返回本身的返回值,还要返回一个标记 COROUTINE_SUSPENDED,这个标记表示挂起函数正在挂起的状态。
续体和续体拦截器
续体就是包装了挂起函数之后应该执行的代码;在编译过程中一个完整的协程被分割成为一个又一个续体。在挂起函数结束后,会通过调用Continuation
的resumeWith
函数来恢复执行续体代码。
续体拦截器:调度器Dispatchers.Main
Dispatchers.IO
Dispatchers.Default
都实现了 ContinuationInterceptor
。续体拦截器负责拦截恢复协程在恢复后应该执行的代码(即续体)并将其在指定线程或线程池恢复。
状态机
协程在编译挂起函数的时候会将挂起函数编译成状态机,避免创建过多的类和对象。
val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)
下面是编译过后的伪代码
// 状态机当前状态
int label = 0
// 协程的局部变量
A a = null
Y y = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// 这次调用,result 应该为空
a = a()
label = 1
result = foo(a).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L1:
// 外部代码传入 .await() 的结果恢复协程
y = (Y) result
b()
label = 2
result = bar(a, y).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L2:
// 外部代码传入 .await() 的结果恢复协程
Z z = (Z) result
c(z)
label = -1 // 没有其他步骤了
return
}
Dispatchers.Main
Dispatchers.Main
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
internal object MainDispatcherLoader {
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
MainCoroutineDispatcher
是一个抽象类。找到它的实现类 HandlerDispatcher
,HandlerDispatcher
是一个密封类,知道它的唯一实现类 HandlerContext
。
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
....
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
....
}
找到 HandlerContext 构造函数调用的地方
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
//这里创建了 HandlerContext 对象
HandlerContext(Looper.getMainLooper().asHandler(async = true))
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
通过以上代码发现,将续体代码所在协程调度到主线程其实还是使用了 Handler,这个Handler 使用的是 mainLooper。
Dispatchers.Defalut、Dispatchers.IO
//Default
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
//IO 调度器
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
override fun close() {
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_DISPATCHER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,//线程池核心数量。
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//分发
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
//分发
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}
。。。
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
//分发
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
。。。
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
。。。
}
@JvmField
internal val CORE_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.max.pool.size",
(AVAILABLE_PROCESSORS * 128).coerceIn(
CORE_POOL_SIZE,
CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE //MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
),
maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
//通过 JVM 取得当前处理器可运行的线程数
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
可以看出是通过coroutineScheduler.dispatch()
进行调度。
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
。。。
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
//将传入的任务压栈
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
。。。
}
以上代码可以看出其实就是实现了 Executor
接口,自己实现了一个线程池。
其中 corePoolSize 是 通过 JVM 的Runtime.getRuntime().availableProcessors()
函数取得当前处理器可运行的线程数,最小是 2。
maxPoolSize 的最小值设定为 corePoolSize。最大值则设定为 (1 shl BLOCKING_SHIFT) - 2
,即 1 向左位移 21 位再减 2,此外还要计算一个值 s:Runtime.getRuntime().availableProcessors()
乘以 128。如果 s 介于最小值与最大值之间,则 maxPoolSize 的值为 s,小于最小值等于最小值,大于最大值则等于最大值。
由此可见 DefaultScheduler
这种情况主要用来处理密集型运算,其核心线程数与处理器的线程数相等,这与 RxJava 的计算线程池的思想是类似的。
Default 调度器还有一种情况是 CommonPool
;CommonPool
与 DefaultScheduler
的设计是类似的,不过它的线程池没有那么麻烦,它直接使用了 Java 的 Executors.newFixedThreadPool
API,这种线程池只有核心线程,核心线程没有超时机制也不会被回收,任务队列没有大小限制,代码就不仔细去看了,总之思想是类似的。
IO 调度器其实就是 DefaultScheduler
中的一个成员属性。
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
private val queue = ConcurrentLinkedQueue<Runnable>()
private val inFlightTasks = atomic(0)
override val executor: Executor
get() = this
override fun execute(command: Runnable) = dispatch(command, false)
override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// Commit in-flight tasks slot
val inFlight = inFlightTasks.incrementAndGet()
// Fast path, if parallelism limit is not reached, dispatch task and return
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
queue.add(taskToSchedule)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatch(block, tailDispatch = true)
}
。。。
}
通过dispatch
方法可以看出是通过 成员属性 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
进行调度的。这个 dispatcher
就是 ExperimentalCoroutineDispatcher。所以 IO 调度器适合 Default 调度器共用的同一个线程池。