Kotlin原理
// 普通函数
fun requestToken(): Token {
// makes request for a token & waits
return token // returns result when received
}
// 挂起函数
suspend fun requestToken(): Token { ... }
//
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
}
实际上在 JVM 中:
Object requestToken(Continuation<Token> cont) { ... }
Continuation 是什么?
// 像一个回调接口
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点
协程的创建与启动
从新建一个协程开始分析协程的创建,
最常见的协程创建方式为CoroutineScope.launch {},
关键源码如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
coroutine.start(start, coroutine, block) //
return coroutine
}
coroutine.start(start, coroutine, block)
默认情况下会走到startCoroutineCancellable,
最终会调用到createCoroutineUnintercepted。
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
public actual fun
completion: Continuation
- suspend 函数本身执行需要一个Continuation 实例在恢复时调用,即参数completion
- 返回值Continuation
是创建出来的协程的载体,receiver suspend 函数会被传给该实例作为协程的实际执行体
在start后,执行到的是 CoroutineStart #invoke()
# CoroutineStart.kt
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
这里以DEFAULT为例,startCoroutineCancellable()
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result) // 传递
}
而在startCoroutineCancellable中,创建了Continuation,且调用resumeWith来传递请求结果。
Continuation是一个接口,接口内有一个resumeWith方法。
在startCoroutineCancellable调用了resumeWith,既然是接口且被调用,那必然是有地方实现了该接口,并且在resumeWith中做了一些事情。
Continuation的具体实现是在ContinuationImpl类中
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion)
而ContinuationImpl继承自BaseContinuationImpl,
在BaseContinuationImpl中就可以看到resumeWith的具体实现。
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) // 判断
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
可以看到,调用invokeSuspend并且对返回的标志判断
// 举个例子,从编译后的代码中看
suspend fun requestToken(): Token { ... } // 挂起函数
suspend fun createPost(token: Token, item: Item): Post { ... } // 挂起函数
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
}
// 编译后生成的内部类大致如下
final class postItem$1 extends SuspendLambda ... {
public final Object invokeSuspend(Object result) { // invokeSuspend
...
switch (this.label) {
case 0:
this.label = 1;
token = requestToken(this)
break;
case 1:
this.label = 2;
Token token = result;
post = createPost(token, this.item, this)
break;
case 2:
Post post = result;
processPost(post)
break;
}
}
}
挂起函数会被编译为一个匿名类,这个匿名类中的一个函数实现了这个状态机。
成员变量label代表了当前状态机的状态。
每一个续体(即挂起点中间部分以及挂起点与函数头尾之间的部分)都各自对应了一个状态。
当函数运行到每个挂起点时,lable的值都受限会发生改变,并且当前的续体都会作为实体参数传递给发生了CPS变换的挂起函数。
如果这个挂起函数没有发生事实上的挂起,函数继续运行,如果发生了事实上的挂起,则函数直接return。
由于label记录了状态,所以在协程恢复的时候,可以根据状态使用goto语句直接跳转至上次的挂起点并向后执行,
通过状态机将多个续体融合到一个SuspendLambda对象中,通过维护一个Int类型的状态值,标记续体执行到哪个步骤了,这个状态值和对应的switch语句就是状态机。使用状态机使得无论挂起lambda表达式体内有多少挂起点,编译器也只创建一个SuspendLambda子类和对象。
package kotlin.coroutines
/**续体接口:代表协程下一步应该执行的代码*/
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
package kotlin.coroutines.jvm.internal
/**②. 续体的基本实现类,实现了resumeWith()*/
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>,... {
//实现续体的resumeWith()函数,并且是final的子类不能覆盖
public final override fun resumeWith(result: Result<Any?>) { ... }
//★定义了一个invokeSuspend()抽象函数,这个函数的函数体实现就是协程代码块中的代码,由kotlin编译器自动生成实现
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
/**③. 续体实现类,继承自BaseContinuationImpl,增加了拦截器intercepted()功能,实现线程调度等*/
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
/**③. 协程构建器launch{}传入的挂起Lambda表达式的封装抽象类,同时它又是一个续体*/
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {}
切换线程
在前面的这段分析中:
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) { // 判断
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result) // 传递
}
首先会判断当前续体类型是否是DispatchedContinuation(有没有被调度器拦截包装),如果没有被拦截直接调用resumeWith()恢复协程代码块的执行,否则调用拦截包装后的DispatchedContinuation类型的resumeCancellableWith():
//包装后的续体类型,维护了调度器dispatcher和原续体对象continuation
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
//DispatchedContinuation类的resumeCancellableWith()
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//通过调度器判断是否需要切换线程
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//A 如果需要,则切换线程,将当前续体this作为Runnable传入
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//★★★B 该函数最终会调用原continuation.resumeWith(result)直接在当前线程恢复协程执行
resumeUndispatchedWith(result)
}
}
}
}
//B 没有调度器拦截的情况直接在当前线程执行
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}
}
首先通过调度器判断是否需要切换线程(当前线程是否是调度器使用的线程,如果不是就需要切换),如果需要则将当前续体对象this当作一个Runnable。
(被包装后的DispatchedContinuation续体实现了Runnable)扔给调度器。
//Dispatchers.Default默认调度器的实现类
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
}
public open class ExperimentalCoroutineDispatcher(...){
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
//A.实现线程切换
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//就是将当前续体当作Runnable扔到线程池,coroutineScheduler的类型是Executor
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}
//被包装后的DispatchedContinuation续体通过继承DispatchedTask间接实现了Runnable
internal class DispatchedContinuation<in T>(...) : DispatchedTask...{
override val delegate: Continuation<T>
get() = this
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T> //DispatchedContinuation对象
...
//Runnable实现
public final override fun run() {
...
val delegate = delegate as DispatchedContinuation<T>
//从代理续体中获取被代理的续体对象,也就是第二次创建的SuspendLambda子类对象
val continuation = delegate.continuation
val context = continuation.context
...
withCoroutineContext(context, delegate.countOrElement) {
...
//★★★最终调用原始续体对象的resumeWith(),从而第一次触发invokeSuspend()启动协程
continuation.resumeWithStackTrace(cause)
...
}
...
}
}
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
默认的调度器DefaultScheduler中维护了一个线程池,也就是将this扔到线程池中执行从而完成默认调度器的线程切换,DispatchedContinuation的run()方法则从代理续体对象中获取到了原始SuspendLambda子类续体对象,然后调用原始续体的resumeWith()第一次触发invokeSuspend
使用:
密封类
协程密封类解决条件处理
改进前:
interface Result {
class Success(val msg:String):Result
class Failure(val error:Exception):Result
fun getResultMsg(result:Result)=when(result){
is Success -> result.msg
is Failure -> result.error.message
else -> throw IllegalAccessException()
}
}
存在问题:
(1)else 分支是多余的
(2)else导致潜在分险:当一个类实现Result这个接口后,忘记在getResultMsg()方法中添加相应条件分支,编译器不会提醒而是当成else 处理
改进后:
sealed class Result
class Success(val msg: String) : Result()
class Failure(val error: Exception) : Result()
fun getResultMsg(result: Result) = when (result) {
is Success -> result.msg
is Failure -> "Error is ${result.error.message}"
}
好处: 当when语句中传入一个密封类变量作为条件时,kotlin 编译器会自动检查该密封类有那几个子类,并强制要求将每一个子类所对应的条件全部处理
未改造前
import retrofit2.http.Field
import retrofit2.http.FormUrlEncoded
import retrofit2.http.POST
interface WanAndroidAPI {
@POST("/user/login")
@FormUrlEncoded
fun loginAction(@Field("username")username:String,
@Field("password")password:String)
:retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
@POST("/user/register")
@FormUrlEncoded
fun registerAction(@Field("usename")username: String,
@Field("password")password: String,
@Field("repassword")repassword:String)
:retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
//------------------
}
//LoginRegisterResponse.kt
data class LoginRegisterResponse(val admin:Boolean,
val chapterTops:List<*>,
val collectIds:List<*>,
val email:String?,
val icon:String?,
val id:String?,
val nickname:String?,
val password:String?,
val publicName:String?,
val token:String?,
val type:Int,
val username:String?
)
//APIClient.kt
import okhttp3.OkHttpClient
import retrofit2.Retrofit
import java.util.concurrent.TimeUnit
class APIClient {
private object Holder{
val INSTANCE = APIClient()
}
companion object{
val instance=Holder.INSTANCE
}
fun<T> instanceRetrofit(apiInterface:Class<T>):T{
val mOkHttpClient = OkHttpClient().newBuilder().myApply {
readTimeout(10000,TimeUnit.SECONDS)
connectTimeout(1000,TimeUnit.SECONDS)
writeTimeout(1000,TimeUnit.SECONDS)
}.build()
val retrofit:Retrofit = Retrofit.Builder()
.baseUrl("https://www.wanandroid.com")
.client(mOkHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
return retrofit.create(apiInterface)
}
}
fun <T> T.myApply(mm:T.()->Unit):T{
mm()
return this
}
//LoginRegisterResponseWrapper.kt
data class LoginRegisterResponseWrapper<T> (val data:T,val errorCode:Int,val errorMsg:String)
//LoginRegisterResponse.kt
data class LoginRegisterResponse(val admin:Boolean,
val chapterTops:List<*>,
val collectIds:List<*>,
val email:String?,
val icon:String?,
val id:String?,
val nickname:String?,
val password:String?,
val publicName:String?,
val token:String?,
val type:Int,
val username:String?
)
//MainActivity.kt
import android.app.ProgressDialog
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.util.Log
import android.view.View
import android.widget.TextView
class MainActivity : AppCompatActivity() {
private val TAG = "ckb"
var mProgressDialog:ProgressDialog?=null
var textView:TextView?=null
//第二大步:主自线程,更新UI
val mHandler = Handler(Looper.getMainLooper()){
val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>
Log.d(TAG,"errorMsg:${result.data}")
textView?.setText(result.data.toString())
mProgressDialog?.dismiss()
false
}
//
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
textView = findViewById<TextView>(R.id.textView)
}
fun startRequest(view:View){
mProgressDialog = ProgressDialog(this)
mProgressDialog ?.setTitle("请求服务器中")
mProgressDialog?.show()
//开启一步线程
object:Thread(){
override fun run() {
super.run()
Thread.sleep(2000)
val loginResult =
APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
.loginAction("Derry-vip","123456")
val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()
//发送Handler 切换Android 主线程
val msg =mHandler.obtainMessage()
msg.obj =result
mHandler.sendMessage(msg)
}
}.start()
}
}
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<TextView
android:id="@+id/textView"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Hello World!"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:layout_marginTop="400dp"
android:text="网络请求"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:onClick="startRequest"
/>
</LinearLayout>
改用协程解决Hander
// WanAndroidAPI.kt
import retrofit2.http.Field
import retrofit2.http.FormUrlEncoded
import retrofit2.http.POST
interface WanAndroidAPI {
@POST("/user/login")
@FormUrlEncoded
fun loginAction(@Field("username")username:String,
@Field("password")password:String)
:retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
@POST("/user/register")
@FormUrlEncoded
fun registerAction(@Field("usename")username: String,
@Field("password")password: String,
@Field("repassword")repassword:String)
:retrofit2.Call<LoginRegisterResponseWrapper<LoginRegisterResponse>>
//-------- ----------
// 协程写法-----加上suspend, 同时不再需要Call<>
@POST("/user/login")
@FormUrlEncoded
suspend fun loginActionCoroutine(@Field("username")username:String,
@Field("password")password:String)
:LoginRegisterResponseWrapper<LoginRegisterResponse>
@POST("/user/register")
@FormUrlEncoded
fun registerActionCoroutine(@Field("usename")username: String,
@Field("password")password: String,
@Field("repassword")repassword:String)
:LoginRegisterResponseWrapper<LoginRegisterResponse>
}
//
class MainActivity : AppCompatActivity() {
private val TAG = "ckb"
var mProgressDialog:ProgressDialog?=null
var textView:TextView?=null
// //第二大步:主自线程,更新UI
// val mHandler = Handler(Looper.getMainLooper()){
// val result = it.obj as LoginRegisterResponseWrapper<LoginRegisterResponse>
// Log.d(TAG,"errorMsg:${result.data}")
// textView?.setText(result.data.toString())
// mProgressDialog?.dismiss()
// false
// }
//
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
textView = findViewById<TextView>(R.id.textView)
}
fun startRequest(view:View){
mProgressDialog = ProgressDialog(this)
mProgressDialog ?.setTitle("请求服务器中")
mProgressDialog?.show()
// //开启一步线程
// object:Thread(){
// override fun run() {
// super.run()
// Thread.sleep(2000)
// //---------
// val loginResult =
// APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
// .loginAction("Derry-vip","123456")
// //--------
//
// val result:LoginRegisterResponseWrapper<LoginRegisterResponse>?=loginResult.execute().body()
//
// //发送Handler 切换Android 主线程
// val msg =mHandler.obtainMessage()
// msg.obj =result
// mHandler.sendMessage(msg)
// }
// }.start()
// -----------------------协程改写-----------
GlobalScope.launch(Dispatchers.Main) {//在Android 上,默认是异步处理
// 主动会被挂起
// 内部会切换成异步线程请求服务器
val loginResult =
APIClient.instance.instanceRetrofit(WanAndroidAPI::class.java)
.loginActionCoroutine("Derry-vip","123456")
// 执行完成后,主动切换回安卓主线程
//在主线程更新UI
Log.d(TAG,"errorMsg:${loginResult.data}")
textView?.setText(loginResult.data.toString())
mProgressDialog?.dismiss()
}
//---------------------------------
}
}
协程解决多层回调带来的问题
// MainActivity3.kt
import android.app.ProgressDialog
import android.graphics.Color
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.os.Message
import android.view.View
import android.widget.TextView
import androidx.appcompat.app.AppCompatActivity
// 接口
interface ResponseCallback{
fun responseSuccess(serverResponseInfo:String)// 登陆成功后的信息类
fun responseError(serverResponseErrorMsg:String) // 登陆失败后的描述
}
//请求加载用户的数据
private fun requestLoadUser(responseCallback:ResponseCallback){
val isLoadSuccess = true;
//开启异步线程
object:Thread(){
override fun run(){
super.run()
try{
sleep(3000L)
if (isLoadSuccess){
responseCallback.responseSuccess("加载到【用户数据】信息集")
}else {
responseCallback.responseError("加载【用户数据】,加载失败,服务器宕机")
}
} catch(e:InterruptedException){
e.printStackTrace()
}
}
}.start()
}
//
//请求加载[用户资产数据]
private fun requestLoadUserAssets(responseCallback:ResponseCallback){
val isLoadSuccess = true;
//开启异步线程
object:Thread(){
override fun run(){
super.run()
try{
sleep(3000L)
if (isLoadSuccess){
responseCallback.responseSuccess("加载到【用户资产数据】信息集")
}else {
responseCallback.responseError("加载【用户资产数据】,加载失败,服务器宕机")
}
}catch(e:InterruptedException){
e.printStackTrace()
}
}
}.start()
}
//
//请求加载[用户资产详情数据】
private fun requestLoadUserAssetsDetails(responseCallback:ResponseCallback){
val isLoadSuccess = true;
//开启异步线程
object:Thread(){
override fun run(){
super.run()
try{
sleep(3000L)
if (isLoadSuccess){
responseCallback.responseSuccess("加载到【用户资产详情数据】信息集")
}else {
responseCallback.responseError("加载【用户资产详情数据】,加载失败,服务器宕机")
}
}catch(e:InterruptedException){
e.printStackTrace()
}
}
}.start()
}
class MainActivity3 :AppCompatActivity(){
private val TAG = "ckb"
var mProgressDialog: ProgressDialog?=null
var textView: TextView?=null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
textView = findViewById<TextView>(R.id.textView)
}
fun startRequest(view: View) {
mProgressDialog = ProgressDialog(this)
mProgressDialog?.setTitle("请求服务器中")
mProgressDialog?.show()
// 先进行第一步异步请求
requestLoadUser(object:ResponseCallback{
override fun responseSuccess(serverResponseInfo: String) {
//从异步线程切回主线程
val handler:Handler = object :Handler(Looper.getMainLooper()){
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
//第一次更新UI
textView?.text = serverResponseInfo
textView?.setTextColor(Color.RED)
//---第二次请求--
requestLoadUserAssets(object : ResponseCallback {
override fun responseSuccess(serverResponseInfo: String) {
val handler:Handler = object :Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
//第二次更新UI
textView?.text = serverResponseInfo
textView?.setTextColor(Color.RED)
//----第三次请求------
requestLoadUserAssetsDetails(object:ResponseCallback{
override fun responseSuccess(serverResponseInfo: String) {
val handler:Handler = object :Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
//第三次更新UI
textView?.text = serverResponseInfo
mProgressDialog?.dismiss()
textView?.setTextColor(Color.BLACK)
}
}
handler.sendEmptyMessage(0)
}
override fun responseError(serverResponseErrorMsg: String) {
TODO("Not yet implemented")
}
})
}
}
handler.sendEmptyMessage(0)
}
override fun responseError(serverResponseErrorMsg: String) {
TODO("Not yet implemented")
}
})
//----
}
}
handler.sendEmptyMessage(0)
}
override fun responseError(serverResponseErrorMsg: String) {
TODO("Not yet implemented")
}
})
}
}
协程更改后
import android.app.ProgressDialog
import android.graphics.Color
import android.os.Bundle
import android.view.View
import android.widget.TextView
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
//
//请求加载用户的数据
private suspend fun requestLoadUser():String{
val isLoadSuccess = true;
// 用withContext 手动切换到异步线程
withContext(Dispatchers.IO){
delay(3000L) // 模拟耗时
}
if(isLoadSuccess){
return "加载到【用户数据】信息集"
}else{
return "加载【用户数据】加载失败,服务器宕机"
}
}
//请求加载【用户资产】的数据
private suspend fun requestLoadUserAssets():String{
val isLoadSuccess = true;
// 用withContext 手动切换到异步线程
withContext(Dispatchers.IO){
delay(3000L) // 模拟耗时
}
if(isLoadSuccess){
return "加载到【用户资产】信息集"
}else{
return "加载【用户数据资产】加载失败,服务器宕机"
}
}
//请求加载[用户资产详情数据】
private suspend fun requestLoadUserAssetsDetails():String{
val isLoadSuccess = true;
// 用withContext 手动切换到异步线程
withContext(Dispatchers.IO){
delay(3000L) // 模拟耗时
}
if(isLoadSuccess){
return "加载到【用户资产详情数据】信息集"
}else{
return "加载【用户资产详情数据】加载失败,服务器宕机"
}
}
class MainActivity4 :AppCompatActivity() {
private val TAG = "ckb"
var mProgressDialog: ProgressDialog?=null
var textView: TextView?=null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
textView = findViewById<TextView>(R.id.textView)
}
fun startRequest(view: View){
mProgressDialog= ProgressDialog(this)
mProgressDialog?.setTitle("请求服务器中。。。")
mProgressDialog?.show()
//这里缺少了错误处理的逻辑
GlobalScope.launch ( Dispatchers.Main ){
// 异步请求1
var serverResponseInfo = requestLoadUser() // 这一步自动完成了主线程和异步线程的切换
//拿到数据后立刻更新UI
textView?.text = serverResponseInfo
textView?.setTextColor(Color.RED)
//执行异步操作2
serverResponseInfo = requestLoadUserAssets()
//拿到数据后立刻更新UI
textView?.text = serverResponseInfo
textView?.setTextColor(Color.BLUE)
//执行异步操作3
serverResponseInfo = requestLoadUserAssets()
//拿到数据后立刻更新UI
textView?.text = serverResponseInfo
mProgressDialog?.dismiss()
textView?.setTextColor(Color.RED)
}
}
}
Kotlin 协程:使用async和await实现高效并发
//计算一
private suspend fun intValue1(): Int {
delay(1000)
return 1
}
// 计算二
private suspend fun intValue2(): Int {
delay(2000)
return 2
}
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = intValue1()
val value2 = intValue2()
println("the result is ${value1 + value2}")
}
println("the elapsedTime is $elapsedTime")
}
// 综合
public inline fun measureTimeMillis(block: () -> Unit): Long {
val start = System.currentTimeMillis()
block()
return System.currentTimeMillis() - start
}
intValue1()和intValue2()是完全独立的,但是从运行结果来看,耗费的时间是intValue1()的时间 + intValue2()的时间。
使用async和await改进:
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = async { intValue1() }
val value2 = async { intValue2() }
println("the result is ${value1.await() + value2.await()}")
}
println("the elapsedTime is $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(1000)
return 1
}
private suspend fun intValue2(): Int {
delay(2000)
return 2
}
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
- 它和CoroutineScope.launch{}类似,可以用来创建一个协程,不同的是launch的返回结果是Job类型,而aysnc的返回结果是Deferred类型。从类的层次结构上看,Deferred是Job的子接口;从功能上来看,Deferred就是带返回结果的Job。
- CoroutineStart.LAZY的作用,可以使得async启动的两个协程并没有立即执行。而是直到调用await方法之后,才开始执行,而await又是会去等待结果,自然需要等待intValue1协程执行完毕后,遇到intValue2.await(),才会触发intValue2协程的执行,又要去等待。那么,async的并发也就失效了。
https://blog.csdn.net/xlh1191860939/article/details/104981066
Coroutines Flow
作用:异步返回多个计算好的值呢
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行
fun simpleFlow() = flow {
println("Flow started")
for(i in 1..3){
delay(1000)
emit(i)
}
}
fun testFlowIsCode() = runBlocking {
val flow = simpleFlow()
println("Flow Collect")
flow.collect { println(it) }
println("Flow Collect again")
flow.collect { println(it) }
}
Flow Collect
Flow started
1
2
3
Flow Collect again
Flow started
1
2
3
Process finished with exit code 0
代码执行val flow = simpleFlow()的时候没有执行flow{…}构建块中的代码,只有调用collect的时候才执行,这就是冷流
使用 flowOn 切换线程
使用withTimeoutOrNull() 流的取消挂起
Flow 的 Terminal 运算符可以是 suspend 函数,如 collect、single、reduce、toList 等;也可以是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。
Flow 在使用各个 suspend 函数时不会阻塞主线程的运行。
//使用 flow:
fun main() = runBlocking {
launch {
for (j in 1..5) {
delay(100)
println("I'm not blocked $j")
}
}
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
println("Done")
}
//-------------结果-----------
1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5
//使用 sequence:
fun main() = runBlocking {
launch {
for (k in 1..5) {
delay(100)
println("I'm blocked $k")
}
}
sequence {
for (i in 1..5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }
println("Done")
}