探秘携程
以一个新手的视觉探秘携程,慢慢透析携程的使用、深入
第一个携程
最简单的例子
fun main() {
GlobalScope.launch {
printMessage("第一个携程")
}
printMessage("主线程执行,代码在GlobalScope.launch之后")
}
(当前运行线程:main) —> 主线程执行,代码在GlobalScope.launch之后
结论:
GlobalScope.launch
不会阻塞主线程的执行GlobalScope.launch
内部的代码并没有被执行,是因为主线程结束了,所以GlobalScope
是类似主线程的守护线程
改造代码,让主线程不销毁
GlobalScope.launch {
printMessage("第一个携程")
}
printMessage("主线程执行,代码在GlobalScope.launch之后")
Thread.sleep(10)
�(当前运行线程:main) —> 主线程执行,代码在GlobalScope.launch之后 (当前运行线程:DefaultDispatcher-worker-1) —> 第一个携程
结论:
- 主线程等待10mm后,携程可以执行完毕,并打印
- 携程的默认执行线程不是主线程,而是
DefaultDispatcher-worker-1
携程运行准备时间
改为主线程只sleep 1mm
GlobalScope.launch {
printMessage("第一个携程")
}
printMessage("主线程执行,代码在GlobalScope.launch之后")
Thread.sleep(1)
(当前运行线程:main; 1625545254752) —> 主线程执行,代码在GlobalScope.launch之后
结论:
- 携程开始执行是需要一段时间的,1mm不足以携程完成初始化
- 携程有资源开销
Dispatchers 调度器
Dispatcher决定了当前携程执行所在的线程,也就是携程的上下文CoroutineContext
,调度器层级实现了CoroutineContext
接口
调度器系统提供的有4种
- Default: 默认实现,就是上面代码
GlobalScope.launch
等的默认参数调度器,最大为核心数 - Main: 在有界面概念的空间才有的,如Android的main线程,不引入Android包报错
- Unconfined:不限制线程,在那个线程调用代码,就在哪个线程执行携程(循环嵌套不保证执行顺序,只能保证最外层执行顺序)
- IO: 阻塞线程,可以创建64或者更大的线程数,与Default共享线程池
第一个Dispatcher
fun m4(){
GlobalScope.launch(Dispatchers.Unconfined) {
printMessage("Dispatchers.Unconfined 携程")
}
printMessage("主线程执行,代码在GlobalScope.launch之后")
}
�(当前运行线程:main; 1625554888325) —> Dispatchers.Unconfined 携程 距离上一次打印间隔为 3 (当前运行线程:main; 1625554888328) —> 主线程执行,代码在GlobalScope.launch之后
结论:
Dispatchers.Unconfined
是在当前线程调用了携程空间,会阻塞当前线程。(思考:coroutine实质上就是线程的一部分,只是增加了调度器的概念,不让线程空闲下来;后续的挂起coroutine的线程,恢复coroutine的线程可能不是同一个,由Dispatcher来决定)- CoroutineContext如果和非携程空间是同一个线程,那么代码是顺序执行的(就是说携程空间依赖线程)
Dispatcher内部使用Dispatcher
fun m5(){
GlobalScope.launch(Dispatchers.Default) {
printMessage("Dispatchers.Default 携程")
delay(100)
GlobalScope.launch (Dispatchers.Unconfined){
printMessage("Dispatchers.Unconfined 携程,内部")
}
}
Thread.sleep(20)
printMessage("主线程执行,代码在GlobalScope.launch之后")
Thread.sleep(2000)
}
(当前运行线程:DefaultDispatcher-worker-1; 1625556460046) —> Dispatchers.Default 携程 距离上一次打印间隔为 16 (当前运行线程:main; 1625556460062) —> 主线程执行,代码在GlobalScope.launch之后 距离上一次打印间隔为 100 (当前运行线程:DefaultDispatcher-worker-1; 1625556460162) —> Dispatchers.Unconfined 携程,内部
结论:
Dispatchers.Unconfined
运行的CoroutineContext是由开启这个空间的线程决定的,两次打印的线程名称相同- 调度器Dispatcher决定执行的线程,这也是携程的核心,调度器(携程空间、上下文)控制一切
IO、Default共享线程池
fun m6(){
GlobalScope.launch(Dispatchers.IO) {
printMessage("携程第1部分被触发")
}
printMessage("主线程执行,代码在携程第一部分之后")
Thread.sleep(10)
GlobalScope.launch {
printMessage("携程第2部分被触发")
}
Thread.sleep(1000)
}
�(当前运行线程:main; 1625557585475) —> 主线程执行,代码在携程第一部分之后距离上一次打印间隔为 3 (当前运行线程:DefaultDispatcher-worker-1; 1625557585478) —> 携程第1部分被触发 距离上一次打印间隔为 11 (当前运行线程:DefaultDispatcher-worker-1; 1625557585489) —> 携程第2部分被触发
结论:
- 2次打印线程名称相同,确实共享线程池
Coroutine启动方式
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
第二个参数 start即为启动方式,为什么会有启动方式呢,Java中线程只有手动调用start的方式启动
DEFAULT | 立即执行协程体 |
---|---|
ATOMIC | 立即执行协程体,但在开始运行之前无法取消 |
UNDISPATCHED | 立即在当前线程执行协程体,直到第一个 suspend 调用 |
LAZY | 只有在需要的情况下运行 |
一个非常详细的博客:https://www.jianshu.com/p/6cf528f423f6
UNDISPATCHED 到底做了什么
这里涉及到一个自定义的拦截器,这样才能看得清拦截器是否被调用
拦截器内部:
class MyContinuation<T>(private val c : Continuation<T>) : Continuation<T> {
override val context: CoroutineContext
get() = c.context
override fun resumeWith(result: Result<T>) {
log("MyContinuation拦截器被调用:$result")
c.resumeWith(result)
}
}
拦截器:
class MyInterceptor : ContinuationInterceptor {
override val key: CoroutineContext.Key<*>
get() = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return MyContinuation(continuation)
}
}
不同启动方式:
fun m11(){
GlobalScope.launch(context = MyInterceptor(), start = CoroutineStart.DEFAULT) {
log(1)
}
log(2)
Thread.sleep(100)
}
fun m12(){
GlobalScope.launch(context = MyInterceptor(), start = CoroutineStart.UNDISPATCHED) {
log(1)
}
log(2)
Thread.sleep(100)
}
m11打印结果:
(当前运行线程:main; 1625764915423) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625764915424) —> 1 (当前运行线程:main; 1625764915427) —> 2
m12打印结果:
(当前运行线程:main; 1625764878878) —> 1 (当前运行线程:main; 1625764878880) —> 2
结论:
- 可以很清楚的看出,
CoroutineStart.UNDISPATCHED
会取消拦截器的调用,那么拦截器的子类,调度器切换线程也不会生效 CoroutineStart.UNDISPATCHED
只会取消当前第一个拦截,后续的拦截不会取消,也符合start的命名,就是开始状态不拦截
上下文 CoroutineContext
class MyContext : CoroutineContext{
// 提供了一个初始类initial R,一个函数operation,需要你返回一个初始类型一样的R
// 猜测:这里需要提供就是 CoroutineContext.Element,应该是由自己的环境决定是返回原始R,还是经过operation进行变换后的R
override fun <R> fold(initial: R, operation: (R, CoroutineContext.Element) -> R): R {
TODO("Not yet implemented")
}
// 猜测:key类型是null接口,应该是用来标记Element的类型,key可能是不同泛型E的单例
// 一种key代表一种Element,这里决定当前上下文是否包含某种key的对应的Element
override fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
TODO("Not yet implemented")
}
// 猜测:当前上下文中减去某种key对应的Element,移除后返回自己
override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
TODO("Not yet implemented")
}
}
写一个自己的上下文实现携程上下文接口,必须要实现的方法有3个,先猜测3个方法的意义,再去看接口描述
/*
Persistent context for the coroutine. It is an indexed set of [Element] instances.
An indexed set is a mix between a set and a map.
Every element in this set has a unique [Key].
*/
CoroutineContext
既是一个Element的集合,每个Element既是元素,又是CoroutineContext
本身,类似组合模式,自己包含自己。每个Element对应唯一的key,这个猜测正确。
Element
是单例
内部的Key<E : Element>
同样是单例,一一对应的关系
EmptyCoroutineContext
空携程上下文
内部实现方法很简单,因为是空实现
CombinedContext
左偏列表
所有携程的组合方式,每个左偏是一个CoroutineContext
,右边是Element
,遍历的时候直到左边也是Element
为止
挂起函数 suspend
挂起函数不会阻塞线程的执行
delay(1000)
job.await()
上面这些都是 suspend函数
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch、async中的block
也是挂起函数
挂起和恢复线程可能不是同一个
携程的挂起、恢复可能不是同一个,具体取决恢复时的状态,
如果恢复时job已经执行完毕,那么将不会切换线程;
如果恢复时job没有执行完毕,由恢复携程的线程调用。
先上一个挂起函数的例子
fun m21(){
GlobalScope.launch(MyInterceptor()) {
log(1)
val job = async {
delay(500)
log(4)
"Hello"
}
log(3)
val result = job.await()
log(result)
}
log(2)
Thread.sleep(1000)
}
打印结果:
(当前运行线程:main; 1625767051406) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767051407) —> 1 (当前运行线程:main; 1625767051410) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767051419) —> 3 (当前运行线程:main; 1625767051423) —> 2 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051933) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051933) —> 4 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051934) —> MyContinuation拦截器被调用:Success(Hello) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051934) —> Hello
结论:
delay
会挂起函数await
也会挂起函数job.await()
是由delay
完成后恢复的,所在线程与delay
恢复后的线程一致
一个与上面差不多的例子,只是多睡了100mm
fun m22(){
GlobalScope.launch(MyInterceptor()) {
log(1)
val job = async {
delay(500)
log(4)
"Hello"
}
log(3)
Thread.sleep(600) // 比上面的挂起delay挂起函数多休息100mm
val result = job.await()
log(result)
}
log(2)
Thread.sleep(1000)
}
除了多了一个Thread.sleep(600)
,没有任何区别
打印结果:
(当前运行线程:main; 1625767550618) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767550619) —> 1 (当前运行线程:main; 1625767550623) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767550630) —> 3 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767551132) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767551132) —> 4 (当前运行线程:main; 1625767551237) —> Hello (当前运行线程:main; 1625767551237) —> 2
结论:
- 拦截器少运行了一次!
job.await()
没有被拦截到 job.await()
直接在main线程中执行了- 因为
Thread.sleep(600)
,上面的挂起函数delay(500)
已经执行完毕,所以job.await()
直接拿到了结果,不需要等待函数恢复(函数恢复的时候会进行一次拦截)
挂起的时机
lauch 、 async 方法虽然不是suspend方法,但是block: suspend CoroutineScope.() -> Unit
是挂起函数
suspend函数结束后是否进行调度是根据当前执行结果确定的
查看suspendCancellableCoroutine **{ }**
方法
@SinceKotlin("1.3")
@PublishedApi // This class is Published API via serialized representation of SafeContinuation, don't rename/move
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
�只有COROUTINE_SUSPENDED
状态下的函数才会真正挂起函
携程取消
Coroutine开启后,只有它的所有子Job完成,该Job才算完成,属性并发结构。
Job的方法,看着与Thread + Future 很像,都是可以取消的。
其实,Job的取消方法与Thread的interrupt
类似,都是发出中断指令,只要挂起的方法才能收到该指令,自动取消
例如:Job中写一个while(true)
循环是没法取消的。
父Job取消,其中的所有子Job都会取消
private var index = 0
private var job1: Job? = null
private var job2: Job? = null
fun m1() {
job1 = GlobalScope.launch (){
job2 = launch {
try {
while (true) {
delay(500)
log(index++)
}
} catch (e: Exception) {
log("循环携程被取消 $e")
}
}
try {
job1!!.join()
} catch (e: Exception) {
log("调用join的携程被取消 $e")
}
}
}
// 查看job1是否完成
fun m2() {
Thread().interrupt()
log("Job1是否执行完毕:${job1?.isCompleted}; Job1是否取消:${job1?.isCancelled}")
}
// 查看job2是否完成
fun m3() {
log("Job2是否执行完毕:${job2?.isCompleted}; Job1是否取消:${job2?.isCancelled}")
}
// 取消job1(外部携程)
fun m4(){
job1?.cancel()
}
// 取消job2(内部携程)
fun m5(){
job2?.cancel()
}
取消父携程
调用m3()
�2021-07-15 15:23:45.843 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:1700132毫秒) —> 232021-07-15 15:23:46.350 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:506毫秒) —> 24 2021-07-15 15:23:46.863 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:514毫秒) —> 25 2021-07-15 15:23:47.374 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:511毫秒) —> 26 2021-07-15 15:23:47.879 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:505毫秒) —> 27 2021-07-15 15:23:48.303 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:424毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@ec6c707 2021-07-15 15:23:48.304 32335-32484/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-2;携程:null; 距离上次间隔:0毫秒) —> 调用join的携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@ec6c707
结论:
- 取消父携程,所有的子携程都会取消(是否取消成功取决于是否有挂起状态,suspend)
- 挂起状态的携程被取消,会报一个
JobCancellationException
,但是该异常不会再往父类传递,也不会引起崩溃 - join不关心内部执行状态,正常结束,异常取消都可以,只关心是否结束,即不再阻塞
- join调用的携程被取消(父携程被取消),会抛出一个
JobCancellationException
奇怪的表现
一个奇怪的地方:如果使用自己的拦截器,join调度线程不会抛出JobCancellationException
异常
ob1 = GlobalScope.launch(MyInterceptor()) // 修改代码使用自己的拦截器
2021-07-15 15:44:12.993 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:381毫秒) —> MyContinuation拦截器被调用:Failure(kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946)2021-07-15 15:44:12.994 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:1毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946 2021-07-15 15:44:12.996 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:2毫秒) —> MyContinuation拦截器被调用:Success(kotlin.Unit)
缺少了一个调用join的携程被取消,取而代之的是Success(kotlin.Unit)
取消子携程
调用m4()
(线程:DefaultDispatcher-worker-2;携程:null; 距离上次间隔:47毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946
结论:
- 子携程取消不会影响父携程
- 父携程如果挂起了该子携程,被取消收不到
JobCancellationException
join、async的区别
join不关心执行的结果,只关心是否执行完毕,异常、正常都返回,异常中断也不会往上抛(join调用的携程中断会抛出一个JobCancellationException
)
async关心执行的结果正常,异常状态,异常会抛出该异常,await
的时候会接收到该异常
异常传播
携程中的异常有一套传播机制,必须弄明白这套机制,才能准确使用携程
处理流程
launch 会在内部出现未捕获的异常时尝试触发对父协程的取消,能否取消要看作用域的定义,如果取消成功,那么异常传递给父协程,否则传递给启动时上下文中配置的 CoroutineExceptionHandler 中,如果没有配置,会查找全局(JVM上)的 CoroutineExceptionHandler 进行处理,如果仍然没有,那么就将异常交给当前线程的 UncaughtExceptionHandler 处理;而 async 则在未捕获的异常出现时同样会尝试取消父协程,但不管是否能够取消成功都不会后其他后续的异常处理,直到用户主动调用 await 时将异常抛出。
作用域
- 通过 GlobeScope 启动的协程单独启动一个协程作用域,内部的子协程遵从默认的作用域规则。通过 GlobeScope 启动的协程“自成一派”。
- coroutineScope 是继承外部 Job 的上下文创建作用域,在其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。它更适合一系列对等的协程并发的完成一项工作,任何一个子协程异常退出,那么整体都将退出,简单来说就是”一损俱损“。这也是协程内部再启动子协程的默认作用域。
- supervisorScope 同样继承外部作用域的上下文,但其内部的取消操作是单向传播的,父协程向子协程传播,反过来则不然,这意味着子协程出了异常并不会影响父协程以及其他兄弟协程。它更适合一些独立不相干的任务,任何一个任务出问题,并不会影响其他任务的工作,简单来说就是”自作自受“,例如 UI,我点击一个按钮出了异常,其实并不会影响手机状态栏的刷新。需要注意的是,supervisorScope 内部启动的子协程内部再启动子协程,如无明确指出,则遵守默认作用域规则,也即 supervisorScope 只作用域其直接子协程。
验证GlobalScope
fun m3() {
GlobalScope.launch(Dispatchers.Main + CoroutineName("3") + coroutineExceptionHandler) {
log(1, this)
try {
GlobalScope.launch {
delay(10)
throw RuntimeException("异常3")
}
} catch (e: Exception) {
log(e, this)
}
}
}
fun m4() {
GlobalScope.launch(Dispatchers.Main + CoroutineName("4") + coroutineExceptionHandler) {
log(1, this)
GlobalScope.launch {
delay(10)
throw RuntimeException("异常4")
}
}
}
以上2个方法均获取不到异常,页面崩溃,可见GlobalScope是独立的作用域,既不继承外面的作用域,也不抛出异常给外部
推广:是否所有Scope空间都是独立作用域
验证所有CoroutineScope
来一个自己的Scope
object MyScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + CoroutineName("MyScope")
}
重复上面的2个方法
fun m5() {
MyScope.launch(coroutineExceptionHandler) {
log(1, this)
try {
MyScope.launch {
delay(10)
throw RuntimeException("异常3")
}
} catch (e: Exception) {
log(e, this)
}
}
}
fun m6() {
MyScope.launch(coroutineExceptionHandler) {
log(1, this)
MyScope.launch {
delay(10)
throw RuntimeException("异常4")
}
}
}
以上2个方法均获取不到异常,页面崩溃,可见MyScope是独立的作用域,既不继承外面的作用域,也不抛出异常给外部
结论:
- 所有的
CoroutineScope
都是独立作用域,context独立创建,不继承
try catch能抓到异常的位置
try catch 能在携程中能像同步一样捕获异常,但是只在代码块中有效,不能作用于子携程
fun m7() {
job1 = MyScope.launch() {
log(1)
try { // 可以抓到 RuntimeException("async 抛出来的")
coroutineScope {
launch {
try { // 可以抓到一个 JobCancellationException 异常
delay(10000)
log(2)
} catch (e: Exception) {
log("3 $e")
}
}
try { // 这里抓捕不到异常
async {
delay(10)
throw RuntimeException("async 抛出来的")
}
} catch (e: Exception) {
log("5 $e")
}
log(4)
}
} catch (e: Exception) {
log("6 $e")
}
}
}
2021-07-15 19:53:05.576 8702-8744/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-3;携程:null; 距离上次间隔:241047毫秒) —> 1 2021-07-15 19:53:05.579 8702-8744/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-3;携程:null; 距离上次间隔:4毫秒) —> 4 2021-07-15 19:53:05.594 8702-8745/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-4;携程:null; 距离上次间隔:15毫秒) —> 3 kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=ScopeCoroutine{Cancelling}@ec6c707 2021-07-15 19:53:05.595 8702-8745/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-4;携程:null; 距离上次间隔:1毫秒) —> 6 java.lang.RuntimeException: async 抛出来的
抓不到5,可以抓到6
结论:
- try catch 可以抓到携程的异步异常
- try catch 只能抓到同一个携程(代码块)下的异常
- async内发生异常的时候虽然不会抛出,但是也会尝试取消父携程
- coroutineScope 不产生新的环境,不做调度,可以管理之下所有异常
�未理解的问题
lifecycleScope.launchWhenStarted
后面的代码块中不能放入2个Flow的collect,否则第二个收不到消息
lifecycleScope.launchWhenStarted {
activityViewModel.uiEvent.collect{
when (it) {
is TopicDetailUiEvent.ReloadEvent -> {
reload()
}
else -> {}
}
}
}
lifecycleScope.launchWhenStarted {
activityViewModel.uiEvent.collect{
when (it) {
is TopicDetailUiEvent.ReloadEvent -> {
reload()
}
else -> {}
}
}
}