协程入门指南

协程基础

新建一个协程

新建协程的步骤很简单,只需要在 Coroutine Scope (协程作用域) 中调用 launch 就行了,如下所示:

  1. import kotlinx.coroutines.*
  2. fun main() = runBlocking { // this: CoroutineScope
  3. launch { // launch a new coroutine and continue
  4. delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
  5. println("World!") // print after delay
  6. }
  7. println("Hello") // main coroutine continues while a previous one is delayed
  8. }

launch 是协程构建器,协程构建器中的代码与其余代码并行运行。

delay 是一个特殊的 挂起函数 ,只能在协程域中使用,它可以让协程暂停一定时间同时不会造成底层线程阻塞,其他协程依然可以运行。

runBlockingCoroutine Scope 的一种,runBlocking 在实际开发中很少用到,因为 runBlocking 运行时会阻塞调用它的线程,直到 runblocking 中所有的协程执行结束为止。

协程遵循结构化并发的设计,新协程需要在特定的协程域 (CoroutineScope )中启动。这种设计的优点:

  1. 将所有协程集中放到协程域中方便管理,等到所有协程完成后,协程域才会结束。
  2. 保证所有代码中的错误都能正确报告且不会丢失。

声明一个挂起函数

fun main() = runBlocking { // this: CoroutineScope
    launch { doWorld() }
    println("Hello")
}

// this is your first suspending function
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

在函数前加上 suspend 修饰就可以声明一个挂起函数了,挂起函数和常规函数都能在协程域中调用,主要区别在于,挂起函数能够调用其他挂起函数(比如上面提到的 delay 函数)暂停协程的执行。

协程作用域

除了 runBlocking 协程作用域外,还可以使用 coroutineScope ,这个协程域和 runBlocking 一样会等到它所有的协程执行完成后才结束,但是它不会阻塞调用线程。

因此 runBlocking 是一个一般函数,而 coroutineScope 是挂起函数,因此 coroutineScope 可以被其他挂起函数调用。如下所示:

fun main() = runBlocking {
    doWorld()
}

suspend fun doWorld() = coroutineScope {  // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

在协程作用域中可以执行多个并发操作,如下所示,我们在挂起函数 doWorld 中加载两个并发协程:

// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
    doWorld()
    println("Done")
}

// Concurrently executes both sections
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("World 2")
    }
    launch {
        delay(1000L)
        println("World 1")
    }
    println("Hello")
}

最终打印结果如下:

Hello
World 1
World 2
Done

world1world2 打印顺序可以看到协程的确是并发运行的,而在 runBlocking 中, doWorld 是一个挂起函数,因此需要等该函数执行完成后才打印 Done ,而在 doWorld 中的 coroutineScope 在其所有协程执行完成后才结束,因此是先打印 world1world2 ,然后才打印 Done

显示声明 Job

launch 协程构建器会返回一个 Job 对象,这个返回的对象是已启动协程的句柄,有了这个句柄,我们可以在代码中主动等待协程结束。比如在下面的代码中,我们等待协程结束才打印 Done

fun main() = runBlocking {
    val job = launch { // launch a new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello")
    job.join() // wait until child coroutine completes
    println("Done")     
}

协程的取消与超时

取消运行中的协程

通过调用 launch 返回的 Job 对象的 cancel() 方法,我们可以取消执行中的协程:

val job = launch {
    repeat(1000) { i ->
        println("job: I'm sleeping $i ...")
        delay(500L)
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion 
println("main: Now I can quit.")

输出结果如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

canceljoin 方法也可以被替换成 Job 的 cancelAndJoin 方法。

在协程中,所有在 kotlinx.coroutines 中的函数都是可以取消的,它们会检查协程是否取消并且在取消时会抛出 CancelationException 异常。

但是如果协程正在进行重复操作并且不检查协程是否取消,则协程不会被取消:

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

打印如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

可以看到在调用 cancelAndJoin() 取消协程后,协程依然在打印 I'm sleeping ,直到 i 不少于5。

有两种方法可以让这种协程在需要时取消:

  1. 定期调用挂起函数检查异常是否取消(比如说 yield 函数)
  2. 通过 isActive 检查协程取消状态

后一种方法示例:

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

isActiveCoroutineScope 的拓展函数,在协程中可用。

在取消协程后回收资源

刚才我们说过,挂起函数被取消后会抛出 CancellationException 异常,该异常可以通过正常方式处理。如下所示:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

通过捕获异常,我们可以在 finally 语句块中回收资源。

需要注意的是在 finally 语句块中执行挂起函数,会抛出 CancellationException 异常,如果我们确实需要在该语句块中执行挂起函数,那么我们需要使用 withContext 函数 和 context 参数:NoCancellable 。如下所示:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

超时操作

如果想要让协程在超出一定时间后自动取消,我们可以使用 withTiemout 函数:

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}

打印如下:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

TimeoutCancellationException 异常由 withTimeout 函数抛出,该异常是 CancellationException 的子类,我们之前没有在打印窗口看到它在堆栈中出现是因为协程把CancellationException 看成是协程完成的正常原因。

对于TimeoutCancellationException 异常我们可以使用传统的 trycatch 语句块处理,或者我们可以使用 withTimeoutOrNull 函数,这个函数超时后不返回异常,而是返回null。如下所示:

val result = withTimeoutOrNull(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
    "Done" // will get cancelled before it produces this result
}
println("Result is $result")

结果打印如下:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

编写挂起函数

顺序执行挂起函数

假设我们有两个挂起函数:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

如果我们想要它们按顺序执行,首先执行 doSomethingUsefulOne,然后再执行 doSomethingUsefulTwo,然后计算它们的返回结果的和。那么事实上我们什么都不需要做,只需要在协程域中按顺序调用这两个函数就好了,因为在协程域中,挂起函数就像普通函数一样按照默认顺序执行。

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")    
}

结果打印如下:

The answer is 42
Completed in 2017 ms

使用 Async 并发执行挂起函数

如果 doSomethingUsefulOnedoSomethingUsefulTwo 没有依赖关系并且我们想要快点得出答案,我们可以使用 async 并发执行这两个函数。

调用async 时,它会像 launch 一样会开启一个协程,但是 launch 会返回 Job 对象,并且该对象不会携带任何结果值,而 async 会返回 Deferred 对象,该对象表示稍后提供结果,我们可以调用该对象的 .await() 函数得到它的最终结果,同时 Deferred 也是一个 Job ,所以我们也可以通过它取消协程。

val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

结果打印:

The answer is 42
Completed in 1017 ms

延迟启动 async 函数

通过将async 函数的start 参数设置为 CoroutineStart.LAZY ,可以让函数延迟启动。然后我们可以先后调用 startawait 方法来启动协程。

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // some computation
        one.start() // start the first one
        two.start() // start the second one
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")    
}

结果打印:

The answer is 42
Completed in 1017 ms

需要注意的是如果我们直接调用 await 方法来启动协程也是可行的,但是这样会导致协程顺序执行,从而丧失并发执行的优点。

如果一个协程抛出了异常,那么所有其他在同一协程作用域中启动的协程都将被取消,并且异常通过层次传递。如下所示:

fun main() = runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> { 
        try {
            delay(Long.MAX_VALUE) // Emulates very long computation
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> { 
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

结果打印如下:

Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException

异常处理

异常如何传递

在协程中我们可以使用try-catch语句块主动捕获并处理异常,而没有捕获到的异常会在协程中传播。
传播形式分为两种

  • 异常自动在协程层级中传播,当子协程发生异常后,异常会通过层级关系传递给根协程,在异常到达根协程后,其他子协程的执行也会被终止。在子协程使用trycatch语句块对异常进行捕获后,异常依然会被传递给父协程的coroutineExceptionHandler 处理。
  • 可以被中途拦截并处理的异常。

协程采用哪种传播形式取决于它的协程构建器。

  • launchactor 自动传播异常
  • asyncproduce 可拦截异常

下面的例子使用GlobalScope和两种类型的协程构建器创建根协程

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val job = GlobalScope.launch { // root coroutine with launch
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async { // root coroutine with async
        println("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

结果打印如下:

Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException

CoroutineExceptionHandler

对于自动传播的异常,我们可以通过在根协程中传入一个自定义 coroutineExceptionHandler ,即可在根协程中处理异常。

因为子协程的异常被传递到根协程 coroutineExceptionHandler 时,传递异常的子协程已经结束运行了,所以我们不能在 coroutineExceptionHandler 恢复产生异常协程的运行状态。

  • 自动传递异常的协程发生异常后,它的父协程会在终止其他子协程后继续传递异常(如果父协程是根协程,则它会在终止其他子协程后调用CoroutineExceptionHandler)。一个协程中多个子协程发生异常时,只有第一个异常会被传递。
  • 所有的子协程都会将异常处理委托给它的父程序,父程序也委托给父程序,依此类推,直到根程序,因此除了根协程的CoroutineExceptionHandler,其他handler都不会被调用。
  • 对于可拦截协程来说,无论是根协程还是子协程,它们的handler都不会被调用。(比如async,它会捕捉协程的异常,并交给它生成的Deferred对象,所以它的CoroutineExceptionHandler也不会被调用。

例外:协程的取消会抛出 CancellationException 异常,但是这个异常会被所有 coroutineExceptionHandler 忽略,并且当一个协程被取消时,它的父协程不会跟着取消,并且这个行为不能被重写。

避免异常传递

在一些特殊情况下,在协程发生异常后,我们可能不想影响到同一父协程下的其他子协程,比如说一个UI的子任务失败了,但我们不想整个控件都被终结。

SupervisorJob

要避免上面这种情况,我们可以使用 SupervisorJob ,SupervisorJob 的效果与 Job 相似,但它可以仅向下传递取消异常,如下所示:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                println("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        println("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

代码输出结果如下:

The first child is failing
The first child is cancelled: true, but the second one is still active
Cancelling the supervisor
The second child is cancelled because the supervisor was cancelled

SupervisorJob 可以避免协程间发生异常导致并发失败的问题,而在协程域并发中我们可以使用 SupervisorScope代替 CoroutineScope 避免异常导致的协程域并发失败。

SupervisorScope

SupervisorScope使用如下所示:

import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
    try {
        supervisorScope {
            val child = launch {
                try {
                    println("The child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    println("The child is cancelled")
                }
            }
            // Give our child a chance to execute and print using yield
            yield()
            println("Throwing an exception from the scope")
            throw AssertionError()
        }
    } catch(e: AssertionError) {
        println("Caught an assertion error")
    }
}

结果打印如下:

The child is sleeping
Throwing an exception from the scope
The child is cancelled
Caught an assertion error

对于 supervisorScope 中的协程来说(父协程和子协程),因为它们的异常不会向上传播,因此子协程的CoroutineExceptionHandler会被调用。