协程入门指南
协程基础
新建一个协程
新建协程的步骤很简单,只需要在 Coroutine Scope (协程作用域) 中调用 launch 就行了,如下所示:
import kotlinx.coroutines.*fun main() = runBlocking { // this: CoroutineScopelaunch { // launch a new coroutine and continuedelay(1000L) // non-blocking delay for 1 second (default time unit is ms)println("World!") // print after delay}println("Hello") // main coroutine continues while a previous one is delayed}
launch 是协程构建器,协程构建器中的代码与其余代码并行运行。
delay 是一个特殊的 挂起函数 ,只能在协程域中使用,它可以让协程暂停一定时间同时不会造成底层线程阻塞,其他协程依然可以运行。
runBlocking 是 Coroutine Scope 的一种,runBlocking 在实际开发中很少用到,因为 runBlocking 运行时会阻塞调用它的线程,直到 runblocking 中所有的协程执行结束为止。
协程遵循结构化并发的设计,新协程需要在特定的协程域 (CoroutineScope )中启动。这种设计的优点:
- 将所有协程集中放到协程域中方便管理,等到所有协程完成后,协程域才会结束。
- 保证所有代码中的错误都能正确报告且不会丢失。
声明一个挂起函数
fun main() = runBlocking { // this: CoroutineScope
launch { doWorld() }
println("Hello")
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
在函数前加上 suspend 修饰就可以声明一个挂起函数了,挂起函数和常规函数都能在协程域中调用,主要区别在于,挂起函数能够调用其他挂起函数(比如上面提到的 delay 函数)暂停协程的执行。
协程作用域
除了 runBlocking 协程作用域外,还可以使用 coroutineScope ,这个协程域和 runBlocking 一样会等到它所有的协程执行完成后才结束,但是它不会阻塞调用线程。
因此 runBlocking 是一个一般函数,而 coroutineScope 是挂起函数,因此 coroutineScope 可以被其他挂起函数调用。如下所示:
fun main() = runBlocking {
doWorld()
}
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
launch {
delay(1000L)
println("World!")
}
println("Hello")
}
在协程作用域中可以执行多个并发操作,如下所示,我们在挂起函数 doWorld 中加载两个并发协程:
// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
doWorld()
println("Done")
}
// Concurrently executes both sections
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}
最终打印结果如下:
Hello
World 1
World 2
Done
从 world1 和 world2 打印顺序可以看到协程的确是并发运行的,而在 runBlocking 中, doWorld 是一个挂起函数,因此需要等该函数执行完成后才打印 Done ,而在 doWorld 中的 coroutineScope 在其所有协程执行完成后才结束,因此是先打印 world1 和 world2 ,然后才打印 Done 。
显示声明 Job
launch 协程构建器会返回一个 Job 对象,这个返回的对象是已启动协程的句柄,有了这个句柄,我们可以在代码中主动等待协程结束。比如在下面的代码中,我们等待协程结束才打印 Done :
fun main() = runBlocking {
val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello")
job.join() // wait until child coroutine completes
println("Done")
}
协程的取消与超时
取消运行中的协程
通过调用 launch 返回的 Job 对象的 cancel() 方法,我们可以取消执行中的协程:
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
输出结果如下:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
cancel 和 join 方法也可以被替换成 Job 的 cancelAndJoin 方法。
在协程中,所有在 kotlinx.coroutines 中的函数都是可以取消的,它们会检查协程是否取消并且在取消时会抛出 CancelationException 异常。
但是如果协程正在进行重复操作并且不检查协程是否取消,则协程不会被取消:
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
打印如下:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.
可以看到在调用 cancelAndJoin() 取消协程后,协程依然在打印 I'm sleeping ,直到 i 不少于5。
有两种方法可以让这种协程在需要时取消:
- 定期调用挂起函数检查异常是否取消(比如说
yield函数) - 通过
isActive检查协程取消状态
后一种方法示例:
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
isActive 是 CoroutineScope 的拓展函数,在协程中可用。
在取消协程后回收资源
刚才我们说过,挂起函数被取消后会抛出 CancellationException 异常,该异常可以通过正常方式处理。如下所示:
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
通过捕获异常,我们可以在 finally 语句块中回收资源。
需要注意的是在 finally 语句块中执行挂起函数,会抛出 CancellationException 异常,如果我们确实需要在该语句块中执行挂起函数,那么我们需要使用 withContext 函数 和 context 参数:NoCancellable 。如下所示:
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
超时操作
如果想要让协程在超出一定时间后自动取消,我们可以使用 withTiemout 函数:
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
打印如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
TimeoutCancellationException 异常由 withTimeout 函数抛出,该异常是 CancellationException 的子类,我们之前没有在打印窗口看到它在堆栈中出现是因为协程把CancellationException 看成是协程完成的正常原因。
对于TimeoutCancellationException 异常我们可以使用传统的 trycatch 语句块处理,或者我们可以使用 withTimeoutOrNull 函数,这个函数超时后不返回异常,而是返回null。如下所示:
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
结果打印如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
编写挂起函数
顺序执行挂起函数
假设我们有两个挂起函数:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
如果我们想要它们按顺序执行,首先执行 doSomethingUsefulOne,然后再执行 doSomethingUsefulTwo,然后计算它们的返回结果的和。那么事实上我们什么都不需要做,只需要在协程域中按顺序调用这两个函数就好了,因为在协程域中,挂起函数就像普通函数一样按照默认顺序执行。
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
结果打印如下:
The answer is 42
Completed in 2017 ms
使用 Async 并发执行挂起函数
如果 doSomethingUsefulOne 和 doSomethingUsefulTwo 没有依赖关系并且我们想要快点得出答案,我们可以使用 async 并发执行这两个函数。
调用async 时,它会像 launch 一样会开启一个协程,但是 launch 会返回 Job 对象,并且该对象不会携带任何结果值,而 async 会返回 Deferred 对象,该对象表示稍后提供结果,我们可以调用该对象的 .await() 函数得到它的最终结果,同时 Deferred 也是一个 Job ,所以我们也可以通过它取消协程。
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
结果打印:
The answer is 42
Completed in 1017 ms
延迟启动 async 函数
通过将async 函数的start 参数设置为 CoroutineStart.LAZY ,可以让函数延迟启动。然后我们可以先后调用 start 和 await 方法来启动协程。
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
结果打印:
The answer is 42
Completed in 1017 ms
需要注意的是如果我们直接调用 await 方法来启动协程也是可行的,但是这样会导致协程顺序执行,从而丧失并发执行的优点。
如果一个协程抛出了异常,那么所有其他在同一协程作用域中启动的协程都将被取消,并且异常通过层次传递。如下所示:
fun main() = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch(e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // Emulates very long computation
42
} finally {
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
结果打印如下:
Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException
异常处理
异常如何传递
在协程中我们可以使用try-catch语句块主动捕获并处理异常,而没有捕获到的异常会在协程中传播。
传播形式分为两种
- 异常自动在协程层级中传播,当子协程发生异常后,异常会通过层级关系传递给根协程,在异常到达根协程后,其他子协程的执行也会被终止。在子协程使用trycatch语句块对异常进行捕获后,异常依然会被传递给父协程的coroutineExceptionHandler 处理。
- 可以被中途拦截并处理的异常。
协程采用哪种传播形式取决于它的协程构建器。
launch和actor自动传播异常async和produce可拦截异常
下面的例子使用GlobalScope和两种类型的协程构建器创建根协程
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val job = GlobalScope.launch { // root coroutine with launch
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async { // root coroutine with async
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}
结果打印如下:
Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException
CoroutineExceptionHandler
对于自动传播的异常,我们可以通过在根协程中传入一个自定义 coroutineExceptionHandler ,即可在根协程中处理异常。
因为子协程的异常被传递到根协程 coroutineExceptionHandler 时,传递异常的子协程已经结束运行了,所以我们不能在 coroutineExceptionHandler 恢复产生异常协程的运行状态。
- 自动传递异常的协程发生异常后,它的父协程会在终止其他子协程后继续传递异常(如果父协程是根协程,则它会在终止其他子协程后调用CoroutineExceptionHandler)。一个协程中多个子协程发生异常时,只有第一个异常会被传递。
- 所有的子协程都会将异常处理委托给它的父程序,父程序也委托给父程序,依此类推,直到根程序,因此除了根协程的CoroutineExceptionHandler,其他handler都不会被调用。
- 对于可拦截协程来说,无论是根协程还是子协程,它们的handler都不会被调用。(比如async,它会捕捉协程的异常,并交给它生成的Deferred对象,所以它的CoroutineExceptionHandler也不会被调用。
例外:协程的取消会抛出 CancellationException 异常,但是这个异常会被所有 coroutineExceptionHandler 忽略,并且当一个协程被取消时,它的父协程不会跟着取消,并且这个行为不能被重写。
避免异常传递
在一些特殊情况下,在协程发生异常后,我们可能不想影响到同一父协程下的其他子协程,比如说一个UI的子任务失败了,但我们不想整个控件都被终结。
SupervisorJob
要避免上面这种情况,我们可以使用 SupervisorJob ,SupervisorJob 的效果与 Job 相似,但它可以仅向下传递取消异常,如下所示:
import kotlinx.coroutines.*
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
println("The second child is cancelled because the supervisor was cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
代码输出结果如下:
The first child is failing
The first child is cancelled: true, but the second one is still active
Cancelling the supervisor
The second child is cancelled because the supervisor was cancelled
SupervisorJob 可以避免协程间发生异常导致并发失败的问题,而在协程域并发中我们可以使用 SupervisorScope代替 CoroutineScope 避免异常导致的协程域并发失败。
SupervisorScope
SupervisorScope使用如下所示:
import kotlin.coroutines.*
import kotlinx.coroutines.*
fun main() = runBlocking {
try {
supervisorScope {
val child = launch {
try {
println("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("The child is cancelled")
}
}
// Give our child a chance to execute and print using yield
yield()
println("Throwing an exception from the scope")
throw AssertionError()
}
} catch(e: AssertionError) {
println("Caught an assertion error")
}
}
结果打印如下:
The child is sleeping
Throwing an exception from the scope
The child is cancelled
Caught an assertion error
对于 supervisorScope 中的协程来说(父协程和子协程),因为它们的异常不会向上传播,因此子协程的CoroutineExceptionHandler会被调用。
