Kotlin原理
// 普通函数fun requestToken(): Token {// makes request for a token & waitsreturn token // returns result when received}// 挂起函数suspend fun requestToken(): Token { ... }//fun postItem(item: Item) {GlobalScope.launch {val token = requestToken()val post = createPost(token, item)processPost(post)}}
实际上在 JVM 中:
Object requestToken(Continuation<Token> cont) { ... }
Continuation 是什么?
// 像一个回调接口public interface Continuation<in T> {public val context: CoroutineContextpublic fun resumeWith(result: Result<T>)}
协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点
协程的创建与启动
从新建一个协程开始分析协程的创建,
最常见的协程创建方式为CoroutineScope.launch {},
关键源码如下:
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job {...coroutine.start(start, coroutine, block) //return coroutine}
coroutine.start(start, coroutine, block)
默认情况下会走到startCoroutineCancellable,
最终会调用到createCoroutineUnintercepted。
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
public actual fun
completion: Continuation
- suspend 函数本身执行需要一个Continuation 实例在恢复时调用,即参数completion
- 返回值Continuation
是创建出来的协程的载体,receiver suspend 函数会被传给该实例作为协程的实际执行体
在start后,执行到的是 CoroutineStart #invoke()
# CoroutineStart.kt@InternalCoroutinesApipublic operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =when (this) {DEFAULT -> block.startCoroutineCancellable(completion)ATOMIC -> block.startCoroutine(completion)UNDISPATCHED -> block.startCoroutineUndispatched(completion)LAZY -> Unit // will start lazily}
这里以DEFAULT为例,startCoroutineCancellable()
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>,onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) {is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)else -> resumeWith(result) // 传递}
而在startCoroutineCancellable中,创建了Continuation,且调用resumeWith来传递请求结果。
Continuation是一个接口,接口内有一个resumeWith方法。
在startCoroutineCancellable调用了resumeWith,既然是接口且被调用,那必然是有地方实现了该接口,并且在resumeWith中做了一些事情。
Continuation的具体实现是在ContinuationImpl类中
internal abstract class ContinuationImpl(completion: Continuation<Any?>?,private val _context: CoroutineContext?) : BaseContinuationImpl(completion)
而ContinuationImpl继承自BaseContinuationImpl,
在BaseContinuationImpl中就可以看到resumeWith的具体实现。
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {var current = thisvar param = resultwhile (true) {probeCoroutineResumed(current)with(current) {val completion = completion!!val outcome: Result<Any?> =try {val outcome = invokeSuspend(param) // 判断if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {completion.resumeWith(outcome)return}}}}
可以看到,调用invokeSuspend并且对返回的标志判断
// 举个例子,从编译后的代码中看suspend fun requestToken(): Token { ... } // 挂起函数suspend fun createPost(token: Token, item: Item): Post { ... } // 挂起函数fun processPost(post: Post) { ... }fun postItem(item: Item) {GlobalScope.launch {val token = requestToken()val post = createPost(token, item)processPost(post)}}// 编译后生成的内部类大致如下final class postItem$1 extends SuspendLambda ... {public final Object invokeSuspend(Object result) { // invokeSuspend...switch (this.label) {case 0:this.label = 1;token = requestToken(this)break;case 1:this.label = 2;Token token = result;post = createPost(token, this.item, this)break;case 2:Post post = result;processPost(post)break;}}}
挂起函数会被编译为一个匿名类,这个匿名类中的一个函数实现了这个状态机。
成员变量label代表了当前状态机的状态。
每一个续体(即挂起点中间部分以及挂起点与函数头尾之间的部分)都各自对应了一个状态。
当函数运行到每个挂起点时,lable的值都受限会发生改变,并且当前的续体都会作为实体参数传递给发生了CPS变换的挂起函数。
如果这个挂起函数没有发生事实上的挂起,函数继续运行,如果发生了事实上的挂起,则函数直接return。
由于label记录了状态,所以在协程恢复的时候,可以根据状态使用goto语句直接跳转至上次的挂起点并向后执行,
通过状态机将多个续体融合到一个SuspendLambda对象中,通过维护一个Int类型的状态值,标记续体执行到哪个步骤了,这个状态值和对应的switch语句就是状态机。使用状态机使得无论挂起lambda表达式体内有多少挂起点,编译器也只创建一个SuspendLambda子类和对象。
package kotlin.coroutines/**续体接口:代表协程下一步应该执行的代码*/public interface Continuation<in T> {public val context: CoroutineContextpublic fun resumeWith(result: Result<T>)}package kotlin.coroutines.jvm.internal/**②. 续体的基本实现类,实现了resumeWith()*/internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>,... {//实现续体的resumeWith()函数,并且是final的子类不能覆盖public final override fun resumeWith(result: Result<Any?>) { ... }//★定义了一个invokeSuspend()抽象函数,这个函数的函数体实现就是协程代码块中的代码,由kotlin编译器自动生成实现protected abstract fun invokeSuspend(result: Result<Any?>): Any?}/**③. 续体实现类,继承自BaseContinuationImpl,增加了拦截器intercepted()功能,实现线程调度等*/internal abstract class ContinuationImpl(completion: Continuation<Any?>?,private val _context: CoroutineContext?) : BaseContinuationImpl(completion) {private var intercepted: Continuation<Any?>? = nullpublic fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }}/**③. 协程构建器launch{}传入的挂起Lambda表达式的封装抽象类,同时它又是一个续体*/internal abstract class SuspendLambda(public override val arity: Int,completion: Continuation<Any?>?) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {}
切换线程
在前面的这段分析中:
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>,onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this) { // 判断is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)else -> resumeWith(result) // 传递}
首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():
//包装后的续体类型,维护了调度器dispatcher和原续体对象continuationinternal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {//DispatchedContinuation类的resumeCancellableWith()inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?) {val state = result.toState(onCancellation)//通过调度器判断是否需要切换线程if (dispatcher.isDispatchNeeded(context)) {_state = stateresumeMode = MODE_CANCELLABLE//A 如果需要,则切换线程,将当前续体this作为Runnable传入dispatcher.dispatch(context, this)} else {executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {//★★★B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行resumeUndispatchedWith(result)}}}}//B 没有调度器拦截的情况直接在当前线程执行inline fun resumeUndispatchedWith(result: Result<T>) {withCoroutineContext(context, countOrElement) {continuation.resumeWith(result)}}}
首先通过调度器判断是否需要切换线程(当前线程是否是调度器使用的线程,如果不是就需要切换),如果需要则将当前续体对象this当作一个Runnable。
(被包装后的DispatchedContinuation续体实现了Runnable)扔给调度器。
//Dispatchers.Default默认调度器的实现类internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {}public open class ExperimentalCoroutineDispatcher(...){override val executor: Executorget() = coroutineSchedulerprivate var coroutineScheduler = createScheduler()//A.实现线程切换override fun dispatch(context: CoroutineContext, block: Runnable): Unit =try {//就是将当前续体当作Runnable扔到线程池,coroutineScheduler的类型是ExecutorcoroutineScheduler.dispatch(block)} catch (e: RejectedExecutionException) {DefaultExecutor.dispatch(context, block)}}//被包装后的DispatchedContinuation续体通过继承DispatchedTask间接实现了Runnableinternal class DispatchedContinuation<in T>(...) : DispatchedTask...{override val delegate: Continuation<T>get() = this}internal abstract class DispatchedTask<in T>(@JvmField public var resumeMode: Int) : SchedulerTask() {internal abstract val delegate: Continuation<T> //DispatchedContinuation对象...//Runnable实现public final override fun run() {...val delegate = delegate as DispatchedContinuation<T>//从代理续体中获取被代理的续体对象,也就是第二次创建的SuspendLambda子类对象val continuation = delegate.continuationval context = continuation.context...withCoroutineContext(context, delegate.countOrElement) {...//★★★最终调用原始续体对象的resumeWith(),从而第一次触发invokeSuspend()启动协程continuation.resumeWithStackTrace(cause)...}...}}internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {resumeWith(Result.failure(recoverStackTrace(exception, this)))}
默认的调度器DefaultScheduler中维护了一个线程池,也就是将this扔到线程池中执行从而完成默认调度器的线程切换,DispatchedContinuation的run()方法则从代理续体对象中获取到了原始SuspendLambda子类续体对象,然后调用原始续体的resumeWith()第一次触发invokeSuspend
使用:
密封类
协程密封类解决条件处理
改进前:
interface Result {class Success(val msg:String):Resultclass Failure(val error:Exception):Resultfun getResultMsg(result:Result)=when(result){is Success -> result.msgis Failure -> result.error.messageelse -> throw IllegalAccessException()}}
存在问题:
(1)else 分支是多余的
(2)else导致潜在分险:当一个类实现Result这个接口后,忘记在getResultMsg()方法中添加相应条件分支,编译器不会提醒而是当成else 处理
改进后:
sealed class Resultclass Success(val msg: String) : Result()class Failure(val error: Exception) : Result()fun getResultMsg(result: Result) = when (result) {is Success -> result.msgis Failure -> "Error is ${result.error.message}"}
好处: 当when语句中传入一个密封类变量作为条件时,kotlin 编译器会自动检查该密封类有那几个子类,并强制要求将每一个子类所对应的条件全部处理
未改造前
import retrofit2.http.Fieldimport retrofit2.http.FormUrlEncodedimport retrofit2.http.POSTinterface WanAndroidAPI {@POST("/user/login")@FormUrlEncodedfun loginAction(@Field("username")username:String,@Field("password")password:String):retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>@POST("/user/register")@FormUrlEncodedfun registerAction(@Field("usename")username: String,@Field("password")password: String,@Field("repassword")repassword:String):retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>//------------------}
//LoginRegisterResponse.ktdata class LoginRegisterResponse(val admin:Boolean,val chapterTops:List<*>,val collectIds:List<*>,val email:String?,val icon:String?,val id:String?,val nickname:String?,val password:String?,val publicName:String?,val token:String?,val type:Int,val username:String?)
//APIClient.ktimport okhttp3.OkHttpClientimport retrofit2.Retrofitimport java.util.concurrent.TimeUnitclass APIClient {private object Holder{val INSTANCE = APIClient()}companion object{val instance=Holder.INSTANCE}fun<T> instanceRetrofit(apiInterface:Class<T>):T{val mOkHttpClient = OkHttpClient().newBuilder().myApply {readTimeout(10000,TimeUnit.SECONDS)connectTimeout(1000,TimeUnit.SECONDS)writeTimeout(1000,TimeUnit.SECONDS)}.build()val retrofit:Retrofit = Retrofit.Builder().baseUrl("https://www.wanandroid.com").client(mOkHttpClient).addCallAdapterFactory(RxJava2CallAdapterFactory.create()).addConverterFactory(GsonConverterFactory.create()).build()return retrofit.create(apiInterface)}}fun <T> T.myApply(mm:T.()->Unit):T{mm()return this}
//LoginRegisterResponseWrapper.ktdata class LoginRegisterResponseWrapper<T> (val data:T,val errorCode:Int,val errorMsg:String)
//LoginRegisterResponse.ktdata class LoginRegisterResponse(val admin:Boolean,val chapterTops:List<*>,val collectIds:List<*>,val email:String?,val icon:String?,val id:String?,val nickname:String?,val password:String?,val publicName:String?,val token:String?,val type:Int,val username:String?)
//MainActivity.ktimport android.app.ProgressDialogimport androidx.appcompat.app.AppCompatActivityimport android.os.Bundleimport android.os.Handlerimport android.os.Looperimport android.util.Logimport android.view.Viewimport android.widget.TextViewclass MainActivity : AppCompatActivity() {private val TAG = "ckb"var mProgressDialog:ProgressDialog?=nullvar textView:TextView?=null//第二大步:主自线程,更新UIval mHandler = Handler(Looper.getMainLooper()){val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>Log.d(TAG,"errorMsg:${result.data}")textView?.setText(result.data.toString())mProgressDialog?.dismiss()false}//override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)textView = findViewById<TextView>(R.id.textView)}fun startRequest(view:View){mProgressDialog = ProgressDialog(this)mProgressDialog ?.setTitle("请求服务器中")mProgressDialog?.show()//开启一步线程object:Thread(){override fun run() {super.run()Thread.sleep(2000)val loginResult =APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java).loginAction("Derry-vip","123456")val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()//发送Handler 切换Android 主线程val msg =mHandler.obtainMessage()msg.obj =resultmHandler.sendMessage(msg)}}.start()}}
<?xml version="1.0" encoding="utf-8"?><LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"xmlns:app="http://schemas.android.com/apk/res-auto"xmlns:tools="http://schemas.android.com/tools"android:layout_width="match_parent"android:layout_height="match_parent"tools:context=".MainActivity"><TextViewandroid:id="@+id/textView"android:layout_width="wrap_content"android:layout_height="wrap_content"android:text="Hello World!"app:layout_constraintBottom_toBottomOf="parent"app:layout_constraintLeft_toLeftOf="parent"app:layout_constraintRight_toRightOf="parent"app:layout_constraintTop_toTopOf="parent" /><Buttonandroid:layout_marginTop="400dp"android:text="网络请求"android:layout_width="wrap_content"android:layout_height="wrap_content"android:onClick="startRequest"/></LinearLayout>
改用协程解决Hander
// WanAndroidAPI.ktimport retrofit2.http.Fieldimport retrofit2.http.FormUrlEncodedimport retrofit2.http.POSTinterface WanAndroidAPI {@POST("/user/login")@FormUrlEncodedfun loginAction(@Field("username")username:String,@Field("password")password:String):retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>@POST("/user/register")@FormUrlEncodedfun registerAction(@Field("usename")username: String,@Field("password")password: String,@Field("repassword")repassword:String):retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>//-------- ----------// 协程写法-----加上suspend, 同时不再需要Call<>@POST("/user/login")@FormUrlEncodedsuspend fun loginActionCoroutine(@Field("username")username:String,@Field("password")password:String):LoginRegisterResponseWrapper<LoginRegisterResponse>@POST("/user/register")@FormUrlEncodedfun registerActionCoroutine(@Field("usename")username: String,@Field("password")password: String,@Field("repassword")repassword:String):LoginRegisterResponseWrapper<LoginRegisterResponse>}
//class MainActivity : AppCompatActivity() {private val TAG = "ckb"var mProgressDialog:ProgressDialog?=nullvar textView:TextView?=null// //第二大步:主自线程,更新UI// val mHandler = Handler(Looper.getMainLooper()){// val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>// Log.d(TAG,"errorMsg:${result.data}")// textView?.setText(result.data.toString())// mProgressDialog?.dismiss()// false// }//override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)textView = findViewById<TextView>(R.id.textView)}fun startRequest(view:View){mProgressDialog = ProgressDialog(this)mProgressDialog ?.setTitle("请求服务器中")mProgressDialog?.show()// //开启一步线程// object:Thread(){// override fun run() {// super.run()// Thread.sleep(2000)// //---------// val loginResult =// APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)// .loginAction("Derry-vip","123456")// //--------//// val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()//// //发送Handler 切换Android 主线程// val msg =mHandler.obtainMessage()// msg.obj =result// mHandler.sendMessage(msg)// }// }.start()// -----------------------协程改写-----------GlobalScope.launch(Dispatchers.Main) {//在Android 上,默认是异步处理// 主动会被挂起// 内部会切换成异步线程请求服务器val loginResult =APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java).loginActionCoroutine("Derry-vip","123456")// 执行完成后,主动切换回安卓主线程//在主线程更新UILog.d(TAG,"errorMsg:${loginResult.data}")textView?.setText(loginResult.data.toString())mProgressDialog?.dismiss()}//---------------------------------}}
协程解决多层回调带来的问题
// MainActivity3.ktimport android.app.ProgressDialogimport android.graphics.Colorimport android.os.Bundleimport android.os.Handlerimport android.os.Looperimport android.os.Messageimport android.view.Viewimport android.widget.TextViewimport androidx.appcompat.app.AppCompatActivity// 接口interface ResponseCallback{fun responseSuccess(serverResponseInfo:String)// 登陆成功后的信息类fun responseError(serverResponseErrorMsg:String) // 登陆失败后的描述}//请求加载用户的数据private fun requestLoadUser(responseCallback:ResponseCallback){val isLoadSuccess = true;//开启异步线程object:Thread(){override fun run(){super.run()try{sleep(3000L)if (isLoadSuccess){responseCallback.responseSuccess("加载到【用户数据】信息集")}else {responseCallback.responseError("加载【用户数据】,加载失败,服务器宕机")}} catch(e:InterruptedException){e.printStackTrace()}}}.start()}////请求加载[用户资产数据]private fun requestLoadUserAssets(responseCallback:ResponseCallback){val isLoadSuccess = true;//开启异步线程object:Thread(){override fun run(){super.run()try{sleep(3000L)if (isLoadSuccess){responseCallback.responseSuccess("加载到【用户资产数据】信息集")}else {responseCallback.responseError("加载【用户资产数据】,加载失败,服务器宕机")}}catch(e:InterruptedException){e.printStackTrace()}}}.start()}////请求加载[用户资产详情数据】private fun requestLoadUserAssetsDetails(responseCallback:ResponseCallback){val isLoadSuccess = true;//开启异步线程object:Thread(){override fun run(){super.run()try{sleep(3000L)if (isLoadSuccess){responseCallback.responseSuccess("加载到【用户资产详情数据】信息集")}else {responseCallback.responseError("加载【用户资产详情数据】,加载失败,服务器宕机")}}catch(e:InterruptedException){e.printStackTrace()}}}.start()}class MainActivity3 :AppCompatActivity(){private val TAG = "ckb"var mProgressDialog: ProgressDialog?=nullvar textView: TextView?=nulloverride fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)textView = findViewById<TextView>(R.id.textView)}fun startRequest(view: View) {mProgressDialog = ProgressDialog(this)mProgressDialog?.setTitle("请求服务器中")mProgressDialog?.show()// 先进行第一步异步请求requestLoadUser(object:ResponseCallback{override fun responseSuccess(serverResponseInfo: String) {//从异步线程切回主线程val handler:Handler = object :Handler(Looper.getMainLooper()){override fun handleMessage(msg: Message) {super.handleMessage(msg)//第一次更新UItextView?.text = serverResponseInfotextView?.setTextColor(Color.RED)//---第二次请求--requestLoadUserAssets(object : ResponseCallback {override fun responseSuccess(serverResponseInfo: String) {val handler:Handler = object :Handler(Looper.getMainLooper()) {override fun handleMessage(msg: Message) {super.handleMessage(msg)//第二次更新UItextView?.text = serverResponseInfotextView?.setTextColor(Color.RED)//----第三次请求------requestLoadUserAssetsDetails(object:ResponseCallback{override fun responseSuccess(serverResponseInfo: String) {val handler:Handler = object :Handler(Looper.getMainLooper()) {override fun handleMessage(msg: Message) {super.handleMessage(msg)//第三次更新UItextView?.text = serverResponseInfomProgressDialog?.dismiss()textView?.setTextColor(Color.BLACK)}}handler.sendEmptyMessage(0)}override fun responseError(serverResponseErrorMsg: String) {TODO("Not yet implemented")}})}}handler.sendEmptyMessage(0)}override fun responseError(serverResponseErrorMsg: String) {TODO("Not yet implemented")}})//----}}handler.sendEmptyMessage(0)}override fun responseError(serverResponseErrorMsg: String) {TODO("Not yet implemented")}})}}
协程更改后
import android.app.ProgressDialogimport android.graphics.Colorimport android.os.Bundleimport android.view.Viewimport android.widget.TextViewimport androidx.appcompat.app.AppCompatActivityimport kotlinx.coroutines.*////请求加载用户的数据private suspend fun requestLoadUser():String{val isLoadSuccess = true;// 用withContext 手动切换到异步线程withContext(Dispatchers.IO){delay(3000L) // 模拟耗时}if(isLoadSuccess){return "加载到【用户数据】信息集"}else{return "加载【用户数据】加载失败,服务器宕机"}}//请求加载【用户资产】的数据private suspend fun requestLoadUserAssets():String{val isLoadSuccess = true;// 用withContext 手动切换到异步线程withContext(Dispatchers.IO){delay(3000L) // 模拟耗时}if(isLoadSuccess){return "加载到【用户资产】信息集"}else{return "加载【用户数据资产】加载失败,服务器宕机"}}//请求加载[用户资产详情数据】private suspend fun requestLoadUserAssetsDetails():String{val isLoadSuccess = true;// 用withContext 手动切换到异步线程withContext(Dispatchers.IO){delay(3000L) // 模拟耗时}if(isLoadSuccess){return "加载到【用户资产详情数据】信息集"}else{return "加载【用户资产详情数据】加载失败,服务器宕机"}}class MainActivity4 :AppCompatActivity() {private val TAG = "ckb"var mProgressDialog: ProgressDialog?=nullvar textView: TextView?=nulloverride fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)textView = findViewById<TextView>(R.id.textView)}fun startRequest(view: View){mProgressDialog= ProgressDialog(this)mProgressDialog?.setTitle("请求服务器中。。。")mProgressDialog?.show()//这里缺少了错误处理的逻辑GlobalScope.launch ( Dispatchers.Main ){// 异步请求1var serverResponseInfo = requestLoadUser() // 这一步自动完成了主线程和异步线程的切换//拿到数据后立刻更新UItextView?.text = serverResponseInfotextView?.setTextColor(Color.RED)//执行异步操作2serverResponseInfo = requestLoadUserAssets()//拿到数据后立刻更新UItextView?.text = serverResponseInfotextView?.setTextColor(Color.BLUE)//执行异步操作3serverResponseInfo = requestLoadUserAssets()//拿到数据后立刻更新UItextView?.text = serverResponseInfomProgressDialog?.dismiss()textView?.setTextColor(Color.RED)}}}
Kotlin 协程:使用async和await实现高效并发
//计算一private suspend fun intValue1(): Int {delay(1000)return 1}// 计算二private suspend fun intValue2(): Int {delay(2000)return 2}fun main() = runBlocking {val elapsedTime = measureTimeMillis {val value1 = intValue1()val value2 = intValue2()println("the result is ${value1 + value2}")}println("the elapsedTime is $elapsedTime")}// 综合public inline fun measureTimeMillis(block: () -> Unit): Long {val start = System.currentTimeMillis()block()return System.currentTimeMillis() - start}
intValue1()和intValue2()是完全独立的,但是从运行结果来看,耗费的时间是intValue1()的时间 + intValue2()的时间。
使用async和await改进:
fun main() = runBlocking {val elapsedTime = measureTimeMillis {val value1 = async { intValue1() }val value2 = async { intValue2() }println("the result is ${value1.await() + value2.await()}")}println("the elapsedTime is $elapsedTime")}private suspend fun intValue1(): Int {delay(1000)return 1}private suspend fun intValue2(): Int {delay(2000)return 2}
public fun <T> CoroutineScope.async(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> T): Deferred<T>
- 它和CoroutineScope.launch{}类似,可以用来创建一个协程,不同的是launch的返回结果是Job类型,而aysnc的返回结果是Deferred类型。从类的层次结构上看,Deferred是Job的子接口;从功能上来看,Deferred就是带返回结果的Job。
- CoroutineStart.LAZY的作用,可以使得async启动的两个协程并没有立即执行。而是直到调用await方法之后,才开始执行,而await又是会去等待结果,自然需要等待intValue1协程执行完毕后,遇到intValue2.await(),才会触发intValue2协程的执行,又要去等待。那么,async的并发也就失效了。
https://blog.csdn.net/xlh1191860939/article/details/104981066
Coroutines Flow
作用:异步返回多个计算好的值呢
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行
fun simpleFlow() = flow {println("Flow started")for(i in 1..3){delay(1000)emit(i)}}fun testFlowIsCode() = runBlocking {val flow = simpleFlow()println("Flow Collect")flow.collect { println(it) }println("Flow Collect again")flow.collect { println(it) }}
Flow CollectFlow started123Flow Collect againFlow started123Process finished with exit code 0
代码执行val flow = simpleFlow()的时候没有执行flow{…}构建块中的代码,只有调用collect的时候才执行,这就是冷流
使用 flowOn 切换线程
使用withTimeoutOrNull() 流的取消挂起
Flow 的 Terminal 运算符可以是 suspend 函数,如 collect、single、reduce、toList 等;也可以是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。
Flow 在使用各个 suspend 函数时不会阻塞主线程的运行。
//使用 flow:fun main() = runBlocking {launch {for (j in 1..5) {delay(100)println("I'm not blocked $j")}}flow {for (i in 1..5) {delay(100)emit(i)}}.collect { println(it) }println("Done")}//-------------结果-----------1I'm not blocked 12I'm not blocked 23I'm not blocked 34I'm not blocked 45DoneI'm not blocked 5
//使用 sequence:fun main() = runBlocking {launch {for (k in 1..5) {delay(100)println("I'm blocked $k")}}sequence {for (i in 1..5) {Thread.sleep(100)yield(i)}}.forEach { println(it) }println("Done")}
