Kotlin原理

  1. // 普通函数
  2. fun requestToken(): Token {
  3. // makes request for a token & waits
  4. return token // returns result when received
  5. }
  6. // 挂起函数
  7. suspend fun requestToken(): Token { ... }
  8. //
  9. fun postItem(item: Item) {
  10. GlobalScope.launch {
  11. val token = requestToken()
  12. val post = createPost(token, item)
  13. processPost(post)
  14. }
  15. }

实际上在 JVM 中:

  1. Object requestToken(Continuation<Token> cont) { ... }

Continuation 是什么?

  1. // 像一个回调接口
  2. public interface Continuation<in T> {
  3. public val context: CoroutineContext
  4. public fun resumeWith(result: Result<T>)
  5. }

协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点

协程的创建与启动

从新建一个协程开始分析协程的创建,
最常见的协程创建方式为CoroutineScope.launch {},
关键源码如下:

  1. public fun CoroutineScope.launch(
  2. context: CoroutineContext = EmptyCoroutineContext,
  3. start: CoroutineStart = CoroutineStart.DEFAULT,
  4. block: suspend CoroutineScope.() -> Unit
  5. ): Job {
  6. ...
  7. coroutine.start(start, coroutine, block) //
  8. return coroutine
  9. }

coroutine.start(start, coroutine, block)
默认情况下会走到startCoroutineCancellable,
最终会调用到createCoroutineUnintercepted。

  1. internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
  2. createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)

public actual fun (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation) : Continuation { … }

  • suspend 函数本身执行需要一个Continuation 实例在恢复时调用,即参数completion
  • 返回值Continuation 是创建出来的协程的载体,receiver suspend 函数会被传给该实例作为协程的实际执行体

在start后,执行到的是 CoroutineStart #invoke()

  1. # CoroutineStart.kt
  2. @InternalCoroutinesApi
  3. public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
  4. when (this) {
  5. DEFAULT -> block.startCoroutineCancellable(completion)
  6. ATOMIC -> block.startCoroutine(completion)
  7. UNDISPATCHED -> block.startCoroutineUndispatched(completion)
  8. LAZY -> Unit // will start lazily
  9. }

这里以DEFAULT为例,startCoroutineCancellable()

  1. internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
  2. createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
  3. public fun <T> Continuation<T>.resumeCancellableWith(
  4. result: Result<T>,
  5. onCancellation: ((cause: Throwable) -> Unit)? = null
  6. ): Unit = when (this) {
  7. is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
  8. else -> resumeWith(result) // 传递
  9. }

而在startCoroutineCancellable中,创建了Continuation,且调用resumeWith来传递请求结果。

Continuation是一个接口,接口内有一个resumeWith方法。
在startCoroutineCancellable调用了resumeWith,既然是接口且被调用,那必然是有地方实现了该接口,并且在resumeWith中做了一些事情。

Continuation的具体实现是在ContinuationImpl类中

  1. internal abstract class ContinuationImpl(
  2. completion: Continuation<Any?>?,
  3. private val _context: CoroutineContext?
  4. ) : BaseContinuationImpl(completion)


而ContinuationImpl继承自BaseContinuationImpl,
在BaseContinuationImpl中就可以看到resumeWith的具体实现。

  1. internal abstract class BaseContinuationImpl(
  2. public val completion: Continuation<Any?>?
  3. ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
  4. public final override fun resumeWith(result: Result<Any?>) {
  5. var current = this
  6. var param = result
  7. while (true) {
  8. probeCoroutineResumed(current)
  9. with(current) {
  10. val completion = completion!!
  11. val outcome: Result<Any?> =
  12. try {
  13. val outcome = invokeSuspend(param) // 判断
  14. if (outcome === COROUTINE_SUSPENDED) return
  15. Result.success(outcome)
  16. } catch (exception: Throwable) {
  17. Result.failure(exception)
  18. }
  19. releaseIntercepted() // this state machine instance is terminating
  20. if (completion is BaseContinuationImpl) {
  21. // unrolling recursion via loop
  22. current = completion
  23. param = outcome
  24. } else {
  25. completion.resumeWith(outcome)
  26. return
  27. }
  28. }
  29. }
  30. }

可以看到,调用invokeSuspend并且对返回的标志判断

  1. // 举个例子,从编译后的代码中看
  2. suspend fun requestToken(): Token { ... } // 挂起函数
  3. suspend fun createPost(token: Token, item: Item): Post { ... } // 挂起函数
  4. fun processPost(post: Post) { ... }
  5. fun postItem(item: Item) {
  6. GlobalScope.launch {
  7. val token = requestToken()
  8. val post = createPost(token, item)
  9. processPost(post)
  10. }
  11. }
  12. // 编译后生成的内部类大致如下
  13. final class postItem$1 extends SuspendLambda ... {
  14. public final Object invokeSuspend(Object result) { // invokeSuspend
  15. ...
  16. switch (this.label) {
  17. case 0:
  18. this.label = 1;
  19. token = requestToken(this)
  20. break;
  21. case 1:
  22. this.label = 2;
  23. Token token = result;
  24. post = createPost(token, this.item, this)
  25. break;
  26. case 2:
  27. Post post = result;
  28. processPost(post)
  29. break;
  30. }
  31. }
  32. }

挂起函数会被编译为一个匿名类,这个匿名类中的一个函数实现了这个状态机。
成员变量label代表了当前状态机的状态。
每一个续体(即挂起点中间部分以及挂起点与函数头尾之间的部分)都各自对应了一个状态。
当函数运行到每个挂起点时,lable的值都受限会发生改变,并且当前的续体都会作为实体参数传递给发生了CPS变换的挂起函数。
如果这个挂起函数没有发生事实上的挂起,函数继续运行,如果发生了事实上的挂起,则函数直接return。
由于label记录了状态,所以在协程恢复的时候,可以根据状态使用goto语句直接跳转至上次的挂起点并向后执行,

通过状态机将多个续体融合到一个SuspendLambda对象中,通过维护一个Int类型的状态值,标记续体执行到哪个步骤了,这个状态值和对应的switch语句就是状态机。使用状态机使得无论挂起lambda表达式体内有多少挂起点,编译器也只创建一个SuspendLambda子类和对象。

  1. package kotlin.coroutines
  2. /**续体接口:代表协程下一步应该执行的代码*/
  3. public interface Continuation<in T> {
  4. public val context: CoroutineContext
  5. public fun resumeWith(result: Result<T>)
  6. }
  7. package kotlin.coroutines.jvm.internal
  8. /**②. 续体的基本实现类,实现了resumeWith()*/
  9. internal abstract class BaseContinuationImpl(
  10. public val completion: Continuation<Any?>?
  11. ) : Continuation<Any?>,... {
  12. //实现续体的resumeWith()函数,并且是final的子类不能覆盖
  13. public final override fun resumeWith(result: Result<Any?>) { ... }
  14. //★定义了一个invokeSuspend()抽象函数,这个函数的函数体实现就是协程代码块中的代码,由kotlin编译器自动生成实现
  15. protected abstract fun invokeSuspend(result: Result<Any?>): Any?
  16. }
  17. /**③. 续体实现类,继承自BaseContinuationImpl,增加了拦截器intercepted()功能,实现线程调度等*/
  18. internal abstract class ContinuationImpl(
  19. completion: Continuation<Any?>?,
  20. private val _context: CoroutineContext?
  21. ) : BaseContinuationImpl(completion) {
  22. private var intercepted: Continuation<Any?>? = null
  23. public fun intercepted(): Continuation<Any?> =
  24. intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
  25. .also { intercepted = it }
  26. }
  27. /**③. 协程构建器launch{}传入的挂起Lambda表达式的封装抽象类,同时它又是一个续体*/
  28. internal abstract class SuspendLambda(
  29. public override val arity: Int,
  30. completion: Continuation<Any?>?
  31. ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {}

切换线程

在前面的这段分析中:

  1. internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
  2. createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
  3. public fun <T> Continuation<T>.resumeCancellableWith(
  4. result: Result<T>,
  5. onCancellation: ((cause: Throwable) -> Unit)? = null
  6. ): Unit = when (this) { // 判断
  7. is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
  8. else -> resumeWith(result) // 传递
  9. }

首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():

  1. //包装后的续体类型,维护了调度器dispatcher和原续体对象continuation
  2. internal class DispatchedContinuation<in T>(
  3. @JvmField val dispatcher: CoroutineDispatcher,
  4. @JvmField val continuation: Continuation<T>
  5. ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  6. //DispatchedContinuation类的resumeCancellableWith()
  7. inline fun resumeCancellableWith(
  8. result: Result<T>,
  9. noinline onCancellation: ((cause: Throwable) -> Unit)?
  10. ) {
  11. val state = result.toState(onCancellation)
  12. //通过调度器判断是否需要切换线程
  13. if (dispatcher.isDispatchNeeded(context)) {
  14. _state = state
  15. resumeMode = MODE_CANCELLABLE
  16. //A 如果需要,则切换线程,将当前续体this作为Runnable传入
  17. dispatcher.dispatch(context, this)
  18. } else {
  19. executeUnconfined(state, MODE_CANCELLABLE) {
  20. if (!resumeCancelled(state)) {
  21. //★★★B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行
  22. resumeUndispatchedWith(result)
  23. }
  24. }
  25. }
  26. }
  27. //B 没有调度器拦截的情况直接在当前线程执行
  28. inline fun resumeUndispatchedWith(result: Result<T>) {
  29. withCoroutineContext(context, countOrElement) {
  30. continuation.resumeWith(result)
  31. }
  32. }
  33. }

首先通过调度器判断是否需要切换线程(当前线程是否是调度器使用的线程,如果不是就需要切换),如果需要则将当前续体对象this当作一个Runnable。
(被包装后的DispatchedContinuation续体实现了Runnable)扔给调度器。

  1. //Dispatchers.Default默认调度器的实现类
  2. internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
  3. }
  4. public open class ExperimentalCoroutineDispatcher(...){
  5. override val executor: Executor
  6. get() = coroutineScheduler
  7. private var coroutineScheduler = createScheduler()
  8. //A.实现线程切换
  9. override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
  10. try {
  11. //就是将当前续体当作Runnable扔到线程池,coroutineScheduler的类型是Executor
  12. coroutineScheduler.dispatch(block)
  13. } catch (e: RejectedExecutionException) {
  14. DefaultExecutor.dispatch(context, block)
  15. }
  16. }
  17. //被包装后的DispatchedContinuation续体通过继承DispatchedTask间接实现了Runnable
  18. internal class DispatchedContinuation<in T>(...) : DispatchedTask...{
  19. override val delegate: Continuation<T>
  20. get() = this
  21. }
  22. internal abstract class DispatchedTask<in T>(
  23. @JvmField public var resumeMode: Int
  24. ) : SchedulerTask() {
  25. internal abstract val delegate: Continuation<T> //DispatchedContinuation对象
  26. ...
  27. //Runnable实现
  28. public final override fun run() {
  29. ...
  30. val delegate = delegate as DispatchedContinuation<T>
  31. //从代理续体中获取被代理的续体对象,也就是第二次创建的SuspendLambda子类对象
  32. val continuation = delegate.continuation
  33. val context = continuation.context
  34. ...
  35. withCoroutineContext(context, delegate.countOrElement) {
  36. ...
  37. //★★★最终调用原始续体对象的resumeWith(),从而第一次触发invokeSuspend()启动协程
  38. continuation.resumeWithStackTrace(cause)
  39. ...
  40. }
  41. ...
  42. }
  43. }
  44. internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
  45. resumeWith(Result.failure(recoverStackTrace(exception, this)))
  46. }

默认的调度器DefaultScheduler中维护了一个线程池,也就是将this扔到线程池中执行从而完成默认调度器的线程切换,DispatchedContinuation的run()方法则从代理续体对象中获取到了原始SuspendLambda子类续体对象,然后调用原始续体的resumeWith()第一次触发invokeSuspend

使用:

密封类

协程密封类解决条件处理

改进前:

  1. interface Result {
  2. class Success(val msg:String):Result
  3. class Failure(val error:Exception):Result
  4. fun getResultMsg(result:Result)=when(result){
  5. is Success -> result.msg
  6. is Failure -> result.error.message
  7. else -> throw IllegalAccessException()
  8. }
  9. }

存在问题:
(1)else 分支是多余的
(2)else导致潜在分险:当一个类实现Result这个接口后,忘记在getResultMsg()方法中添加相应条件分支,编译器不会提醒而是当成else 处理

改进后:

  1. sealed class Result
  2. class Success(val msg: String) : Result()
  3. class Failure(val error: Exception) : Result()
  4. fun getResultMsg(result: Result) = when (result) {
  5. is Success -> result.msg
  6. is Failure -> "Error is ${result.error.message}"
  7. }

好处: 当when语句中传入一个密封类变量作为条件时,kotlin 编译器会自动检查该密封类有那几个子类,并强制要求将每一个子类所对应的条件全部处理


未改造前

  1. import retrofit2.http.Field
  2. import retrofit2.http.FormUrlEncoded
  3. import retrofit2.http.POST
  4. interface WanAndroidAPI {
  5. @POST("/user/login")
  6. @FormUrlEncoded
  7. fun loginAction(@Field("username")username:String,
  8. @Field("password")password:String)
  9. :retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
  10. @POST("/user/register")
  11. @FormUrlEncoded
  12. fun registerAction(@Field("usename")username: String,
  13. @Field("password")password: String,
  14. @Field("repassword")repassword:String)
  15. :retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
  16. //------------------
  17. }
  1. //LoginRegisterResponse.kt
  2. data class LoginRegisterResponse(val admin:Boolean,
  3. val chapterTops:List<*>,
  4. val collectIds:List<*>,
  5. val email:String?,
  6. val icon:String?,
  7. val id:String?,
  8. val nickname:String?,
  9. val password:String?,
  10. val publicName:String?,
  11. val token:String?,
  12. val type:Int,
  13. val username:String?
  14. )
  1. //APIClient.kt
  2. import okhttp3.OkHttpClient
  3. import retrofit2.Retrofit
  4. import java.util.concurrent.TimeUnit
  5. class APIClient {
  6. private object Holder{
  7. val INSTANCE = APIClient()
  8. }
  9. companion object{
  10. val instance=Holder.INSTANCE
  11. }
  12. fun<T> instanceRetrofit(apiInterface:Class<T>):T{
  13. val mOkHttpClient = OkHttpClient().newBuilder().myApply {
  14. readTimeout(10000,TimeUnit.SECONDS)
  15. connectTimeout(1000,TimeUnit.SECONDS)
  16. writeTimeout(1000,TimeUnit.SECONDS)
  17. }.build()
  18. val retrofit:Retrofit = Retrofit.Builder()
  19. .baseUrl("https://www.wanandroid.com")
  20. .client(mOkHttpClient)
  21. .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
  22. .addConverterFactory(GsonConverterFactory.create())
  23. .build()
  24. return retrofit.create(apiInterface)
  25. }
  26. }
  27. fun <T> T.myApply(mm:T.()->Unit):T{
  28. mm()
  29. return this
  30. }
  1. //LoginRegisterResponseWrapper.kt
  2. data class LoginRegisterResponseWrapper<T> (val data:T,val errorCode:Int,val errorMsg:String)
  1. //LoginRegisterResponse.kt
  2. data class LoginRegisterResponse(val admin:Boolean,
  3. val chapterTops:List<*>,
  4. val collectIds:List<*>,
  5. val email:String?,
  6. val icon:String?,
  7. val id:String?,
  8. val nickname:String?,
  9. val password:String?,
  10. val publicName:String?,
  11. val token:String?,
  12. val type:Int,
  13. val username:String?
  14. )
  1. //MainActivity.kt
  2. import android.app.ProgressDialog
  3. import androidx.appcompat.app.AppCompatActivity
  4. import android.os.Bundle
  5. import android.os.Handler
  6. import android.os.Looper
  7. import android.util.Log
  8. import android.view.View
  9. import android.widget.TextView
  10. class MainActivity : AppCompatActivity() {
  11. private val TAG = "ckb"
  12. var mProgressDialog:ProgressDialog?=null
  13. var textView:TextView?=null
  14. //第二大步:主自线程,更新UI
  15. val mHandler = Handler(Looper.getMainLooper()){
  16. val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>
  17. Log.d(TAG,"errorMsg:${result.data}")
  18. textView?.setText(result.data.toString())
  19. mProgressDialog?.dismiss()
  20. false
  21. }
  22. //
  23. override fun onCreate(savedInstanceState: Bundle?) {
  24. super.onCreate(savedInstanceState)
  25. setContentView(R.layout.activity_main)
  26. textView = findViewById<TextView>(R.id.textView)
  27. }
  28. fun startRequest(view:View){
  29. mProgressDialog = ProgressDialog(this)
  30. mProgressDialog ?.setTitle("请求服务器中")
  31. mProgressDialog?.show()
  32. //开启一步线程
  33. object:Thread(){
  34. override fun run() {
  35. super.run()
  36. Thread.sleep(2000)
  37. val loginResult =
  38. APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
  39. .loginAction("Derry-vip","123456")
  40. val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()
  41. //发送Handler 切换Android 主线程
  42. val msg =mHandler.obtainMessage()
  43. msg.obj =result
  44. mHandler.sendMessage(msg)
  45. }
  46. }.start()
  47. }
  48. }
  1. <?xml version="1.0" encoding="utf-8"?>
  2. <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
  3. xmlns:app="http://schemas.android.com/apk/res-auto"
  4. xmlns:tools="http://schemas.android.com/tools"
  5. android:layout_width="match_parent"
  6. android:layout_height="match_parent"
  7. tools:context=".MainActivity">
  8. <TextView
  9. android:id="@+id/textView"
  10. android:layout_width="wrap_content"
  11. android:layout_height="wrap_content"
  12. android:text="Hello World!"
  13. app:layout_constraintBottom_toBottomOf="parent"
  14. app:layout_constraintLeft_toLeftOf="parent"
  15. app:layout_constraintRight_toRightOf="parent"
  16. app:layout_constraintTop_toTopOf="parent" />
  17. <Button
  18. android:layout_marginTop="400dp"
  19. android:text="网络请求"
  20. android:layout_width="wrap_content"
  21. android:layout_height="wrap_content"
  22. android:onClick="startRequest"
  23. />
  24. </LinearLayout>

改用协程解决Hander

  1. // WanAndroidAPI.kt
  2. import retrofit2.http.Field
  3. import retrofit2.http.FormUrlEncoded
  4. import retrofit2.http.POST
  5. interface WanAndroidAPI {
  6. @POST("/user/login")
  7. @FormUrlEncoded
  8. fun loginAction(@Field("username")username:String,
  9. @Field("password")password:String)
  10. :retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
  11. @POST("/user/register")
  12. @FormUrlEncoded
  13. fun registerAction(@Field("usename")username: String,
  14. @Field("password")password: String,
  15. @Field("repassword")repassword:String)
  16. :retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
  17. //-------- ----------
  18. // 协程写法-----加上suspend, 同时不再需要Call<>
  19. @POST("/user/login")
  20. @FormUrlEncoded
  21. suspend fun loginActionCoroutine(@Field("username")username:String,
  22. @Field("password")password:String)
  23. :LoginRegisterResponseWrapper<LoginRegisterResponse>
  24. @POST("/user/register")
  25. @FormUrlEncoded
  26. fun registerActionCoroutine(@Field("usename")username: String,
  27. @Field("password")password: String,
  28. @Field("repassword")repassword:String)
  29. :LoginRegisterResponseWrapper<LoginRegisterResponse>
  30. }
  1. //
  2. class MainActivity : AppCompatActivity() {
  3. private val TAG = "ckb"
  4. var mProgressDialog:ProgressDialog?=null
  5. var textView:TextView?=null
  6. // //第二大步:主自线程,更新UI
  7. // val mHandler = Handler(Looper.getMainLooper()){
  8. // val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>
  9. // Log.d(TAG,"errorMsg:${result.data}")
  10. // textView?.setText(result.data.toString())
  11. // mProgressDialog?.dismiss()
  12. // false
  13. // }
  14. //
  15. override fun onCreate(savedInstanceState: Bundle?) {
  16. super.onCreate(savedInstanceState)
  17. setContentView(R.layout.activity_main)
  18. textView = findViewById<TextView>(R.id.textView)
  19. }
  20. fun startRequest(view:View){
  21. mProgressDialog = ProgressDialog(this)
  22. mProgressDialog ?.setTitle("请求服务器中")
  23. mProgressDialog?.show()
  24. // //开启一步线程
  25. // object:Thread(){
  26. // override fun run() {
  27. // super.run()
  28. // Thread.sleep(2000)
  29. // //---------
  30. // val loginResult =
  31. // APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
  32. // .loginAction("Derry-vip","123456")
  33. // //--------
  34. //
  35. // val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()
  36. //
  37. // //发送Handler 切换Android 主线程
  38. // val msg =mHandler.obtainMessage()
  39. // msg.obj =result
  40. // mHandler.sendMessage(msg)
  41. // }
  42. // }.start()
  43. // -----------------------协程改写-----------
  44. GlobalScope.launch(Dispatchers.Main) {//在Android 上,默认是异步处理
  45. // 主动会被挂起
  46. // 内部会切换成异步线程请求服务器
  47. val loginResult =
  48. APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
  49. .loginActionCoroutine("Derry-vip","123456")
  50. // 执行完成后,主动切换回安卓主线程
  51. //在主线程更新UI
  52. Log.d(TAG,"errorMsg:${loginResult.data}")
  53. textView?.setText(loginResult.data.toString())
  54. mProgressDialog?.dismiss()
  55. }
  56. //---------------------------------
  57. }
  58. }

协程解决多层回调带来的问题

  1. // MainActivity3.kt
  2. import android.app.ProgressDialog
  3. import android.graphics.Color
  4. import android.os.Bundle
  5. import android.os.Handler
  6. import android.os.Looper
  7. import android.os.Message
  8. import android.view.View
  9. import android.widget.TextView
  10. import androidx.appcompat.app.AppCompatActivity
  11. // 接口
  12. interface ResponseCallback{
  13. fun responseSuccess(serverResponseInfo:String)// 登陆成功后的信息类
  14. fun responseError(serverResponseErrorMsg:String) // 登陆失败后的描述
  15. }
  16. //请求加载用户的数据
  17. private fun requestLoadUser(responseCallback:ResponseCallback){
  18. val isLoadSuccess = true;
  19. //开启异步线程
  20. object:Thread(){
  21. override fun run(){
  22. super.run()
  23. try{
  24. sleep(3000L)
  25. if (isLoadSuccess){
  26. responseCallback.responseSuccess("加载到【用户数据】信息集")
  27. }else {
  28. responseCallback.responseError("加载【用户数据】,加载失败,服务器宕机")
  29. }
  30. } catch(e:InterruptedException){
  31. e.printStackTrace()
  32. }
  33. }
  34. }.start()
  35. }
  36. //
  37. //请求加载[用户资产数据]
  38. private fun requestLoadUserAssets(responseCallback:ResponseCallback){
  39. val isLoadSuccess = true;
  40. //开启异步线程
  41. object:Thread(){
  42. override fun run(){
  43. super.run()
  44. try{
  45. sleep(3000L)
  46. if (isLoadSuccess){
  47. responseCallback.responseSuccess("加载到【用户资产数据】信息集")
  48. }else {
  49. responseCallback.responseError("加载【用户资产数据】,加载失败,服务器宕机")
  50. }
  51. }catch(e:InterruptedException){
  52. e.printStackTrace()
  53. }
  54. }
  55. }.start()
  56. }
  57. //
  58. //请求加载[用户资产详情数据】
  59. private fun requestLoadUserAssetsDetails(responseCallback:ResponseCallback){
  60. val isLoadSuccess = true;
  61. //开启异步线程
  62. object:Thread(){
  63. override fun run(){
  64. super.run()
  65. try{
  66. sleep(3000L)
  67. if (isLoadSuccess){
  68. responseCallback.responseSuccess("加载到【用户资产详情数据】信息集")
  69. }else {
  70. responseCallback.responseError("加载【用户资产详情数据】,加载失败,服务器宕机")
  71. }
  72. }catch(e:InterruptedException){
  73. e.printStackTrace()
  74. }
  75. }
  76. }.start()
  77. }
  78. class MainActivity3 :AppCompatActivity(){
  79. private val TAG = "ckb"
  80. var mProgressDialog: ProgressDialog?=null
  81. var textView: TextView?=null
  82. override fun onCreate(savedInstanceState: Bundle?) {
  83. super.onCreate(savedInstanceState)
  84. setContentView(R.layout.activity_main)
  85. textView = findViewById<TextView>(R.id.textView)
  86. }
  87. fun startRequest(view: View) {
  88. mProgressDialog = ProgressDialog(this)
  89. mProgressDialog?.setTitle("请求服务器中")
  90. mProgressDialog?.show()
  91. // 先进行第一步异步请求
  92. requestLoadUser(object:ResponseCallback{
  93. override fun responseSuccess(serverResponseInfo: String) {
  94. //从异步线程切回主线程
  95. val handler:Handler = object :Handler(Looper.getMainLooper()){
  96. override fun handleMessage(msg: Message) {
  97. super.handleMessage(msg)
  98. //第一次更新UI
  99. textView?.text = serverResponseInfo
  100. textView?.setTextColor(Color.RED)
  101. //---第二次请求--
  102. requestLoadUserAssets(object : ResponseCallback {
  103. override fun responseSuccess(serverResponseInfo: String) {
  104. val handler:Handler = object :Handler(Looper.getMainLooper()) {
  105. override fun handleMessage(msg: Message) {
  106. super.handleMessage(msg)
  107. //第二次更新UI
  108. textView?.text = serverResponseInfo
  109. textView?.setTextColor(Color.RED)
  110. //----第三次请求------
  111. requestLoadUserAssetsDetails(object:ResponseCallback{
  112. override fun responseSuccess(serverResponseInfo: String) {
  113. val handler:Handler = object :Handler(Looper.getMainLooper()) {
  114. override fun handleMessage(msg: Message) {
  115. super.handleMessage(msg)
  116. //第三次更新UI
  117. textView?.text = serverResponseInfo
  118. mProgressDialog?.dismiss()
  119. textView?.setTextColor(Color.BLACK)
  120. }
  121. }
  122. handler.sendEmptyMessage(0)
  123. }
  124. override fun responseError(serverResponseErrorMsg: String) {
  125. TODO("Not yet implemented")
  126. }
  127. })
  128. }
  129. }
  130. handler.sendEmptyMessage(0)
  131. }
  132. override fun responseError(serverResponseErrorMsg: String) {
  133. TODO("Not yet implemented")
  134. }
  135. })
  136. //----
  137. }
  138. }
  139. handler.sendEmptyMessage(0)
  140. }
  141. override fun responseError(serverResponseErrorMsg: String) {
  142. TODO("Not yet implemented")
  143. }
  144. })
  145. }
  146. }

协程更改后

  1. import android.app.ProgressDialog
  2. import android.graphics.Color
  3. import android.os.Bundle
  4. import android.view.View
  5. import android.widget.TextView
  6. import androidx.appcompat.app.AppCompatActivity
  7. import kotlinx.coroutines.*
  8. //
  9. //请求加载用户的数据
  10. private suspend fun requestLoadUser():String{
  11. val isLoadSuccess = true;
  12. // 用withContext 手动切换到异步线程
  13. withContext(Dispatchers.IO){
  14. delay(3000L) // 模拟耗时
  15. }
  16. if(isLoadSuccess){
  17. return "加载到【用户数据】信息集"
  18. }else{
  19. return "加载【用户数据】加载失败,服务器宕机"
  20. }
  21. }
  22. //请求加载【用户资产】的数据
  23. private suspend fun requestLoadUserAssets():String{
  24. val isLoadSuccess = true;
  25. // 用withContext 手动切换到异步线程
  26. withContext(Dispatchers.IO){
  27. delay(3000L) // 模拟耗时
  28. }
  29. if(isLoadSuccess){
  30. return "加载到【用户资产】信息集"
  31. }else{
  32. return "加载【用户数据资产】加载失败,服务器宕机"
  33. }
  34. }
  35. //请求加载[用户资产详情数据】
  36. private suspend fun requestLoadUserAssetsDetails():String{
  37. val isLoadSuccess = true;
  38. // 用withContext 手动切换到异步线程
  39. withContext(Dispatchers.IO){
  40. delay(3000L) // 模拟耗时
  41. }
  42. if(isLoadSuccess){
  43. return "加载到【用户资产详情数据】信息集"
  44. }else{
  45. return "加载【用户资产详情数据】加载失败,服务器宕机"
  46. }
  47. }
  48. class MainActivity4 :AppCompatActivity() {
  49. private val TAG = "ckb"
  50. var mProgressDialog: ProgressDialog?=null
  51. var textView: TextView?=null
  52. override fun onCreate(savedInstanceState: Bundle?) {
  53. super.onCreate(savedInstanceState)
  54. setContentView(R.layout.activity_main)
  55. textView = findViewById<TextView>(R.id.textView)
  56. }
  57. fun startRequest(view: View){
  58. mProgressDialog= ProgressDialog(this)
  59. mProgressDialog?.setTitle("请求服务器中。。。")
  60. mProgressDialog?.show()
  61. //这里缺少了错误处理的逻辑
  62. GlobalScope.launch ( Dispatchers.Main ){
  63. // 异步请求1
  64. var serverResponseInfo = requestLoadUser() // 这一步自动完成了主线程和异步线程的切换
  65. //拿到数据后立刻更新UI
  66. textView?.text = serverResponseInfo
  67. textView?.setTextColor(Color.RED)
  68. //执行异步操作2
  69. serverResponseInfo = requestLoadUserAssets()
  70. //拿到数据后立刻更新UI
  71. textView?.text = serverResponseInfo
  72. textView?.setTextColor(Color.BLUE)
  73. //执行异步操作3
  74. serverResponseInfo = requestLoadUserAssets()
  75. //拿到数据后立刻更新UI
  76. textView?.text = serverResponseInfo
  77. mProgressDialog?.dismiss()
  78. textView?.setTextColor(Color.RED)
  79. }
  80. }
  81. }

Kotlin 协程:使用async和await实现高效并发

  1. //计算一
  2. private suspend fun intValue1(): Int {
  3. delay(1000)
  4. return 1
  5. }
  6. // 计算二
  7. private suspend fun intValue2(): Int {
  8. delay(2000)
  9. return 2
  10. }
  11. fun main() = runBlocking {
  12. val elapsedTime = measureTimeMillis {
  13. val value1 = intValue1()
  14. val value2 = intValue2()
  15. println("the result is ${value1 + value2}")
  16. }
  17. println("the elapsedTime is $elapsedTime")
  18. }
  19. // 综合
  20. public inline fun measureTimeMillis(block: () -> Unit): Long {
  21. val start = System.currentTimeMillis()
  22. block()
  23. return System.currentTimeMillis() - start
  24. }

intValue1()和intValue2()是完全独立的,但是从运行结果来看,耗费的时间是intValue1()的时间 + intValue2()的时间。

使用async和await改进:

  1. fun main() = runBlocking {
  2. val elapsedTime = measureTimeMillis {
  3. val value1 = async { intValue1() }
  4. val value2 = async { intValue2() }
  5. println("the result is ${value1.await() + value2.await()}")
  6. }
  7. println("the elapsedTime is $elapsedTime")
  8. }
  9. private suspend fun intValue1(): Int {
  10. delay(1000)
  11. return 1
  12. }
  13. private suspend fun intValue2(): Int {
  14. delay(2000)
  15. return 2
  16. }
  1. public fun <T> CoroutineScope.async(
  2. context: CoroutineContext = EmptyCoroutineContext,
  3. start: CoroutineStart = CoroutineStart.DEFAULT,
  4. block: suspend CoroutineScope.() -> T
  5. ): Deferred<T>
  • 它和CoroutineScope.launch{}类似,可以用来创建一个协程,不同的是launch的返回结果是Job类型,而aysnc的返回结果是Deferred类型。从类的层次结构上看,Deferred是Job的子接口;从功能上来看,Deferred就是带返回结果的Job。
  • CoroutineStart.LAZY的作用,可以使得async启动的两个协程并没有立即执行。而是直到调用await方法之后,才开始执行,而await又是会去等待结果,自然需要等待intValue1协程执行完毕后,遇到intValue2.await(),才会触发intValue2协程的执行,又要去等待。那么,async的并发也就失效了。

https://blog.csdn.net/xlh1191860939/article/details/104981066

Coroutines Flow

作用:异步返回多个计算好的值呢

Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行

  1. fun simpleFlow() = flow {
  2. println("Flow started")
  3. for(i in 1..3){
  4. delay(1000)
  5. emit(i)
  6. }
  7. }
  8. fun testFlowIsCode() = runBlocking {
  9. val flow = simpleFlow()
  10. println("Flow Collect")
  11. flow.collect { println(it) }
  12. println("Flow Collect again")
  13. flow.collect { println(it) }
  14. }
  1. Flow Collect
  2. Flow started
  3. 1
  4. 2
  5. 3
  6. Flow Collect again
  7. Flow started
  8. 1
  9. 2
  10. 3
  11. Process finished with exit code 0

代码执行val flow = simpleFlow()的时候没有执行flow{…}构建块中的代码,只有调用collect的时候才执行,这就是冷流

使用 flowOn 切换线程

使用withTimeoutOrNull() 流的取消挂起

Flow 的 Terminal 运算符可以是 suspend 函数,如 collect、single、reduce、toList 等;也可以是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。
Flow 在使用各个 suspend 函数时不会阻塞主线程的运行。

  1. //使用 flow:
  2. fun main() = runBlocking {
  3. launch {
  4. for (j in 1..5) {
  5. delay(100)
  6. println("I'm not blocked $j")
  7. }
  8. }
  9. flow {
  10. for (i in 1..5) {
  11. delay(100)
  12. emit(i)
  13. }
  14. }.collect { println(it) }
  15. println("Done")
  16. }
  17. //-------------结果-----------
  18. 1
  19. I'm not blocked 1
  20. 2
  21. I'm not blocked 2
  22. 3
  23. I'm not blocked 3
  24. 4
  25. I'm not blocked 4
  26. 5
  27. Done
  28. I'm not blocked 5
  1. //使用 sequence:
  2. fun main() = runBlocking {
  3. launch {
  4. for (k in 1..5) {
  5. delay(100)
  6. println("I'm blocked $k")
  7. }
  8. }
  9. sequence {
  10. for (i in 1..5) {
  11. Thread.sleep(100)
  12. yield(i)
  13. }
  14. }.forEach { println(it) }
  15. println("Done")
  16. }