• 协程基础
    • 示例程序,delay()是一个特殊的函数不会造成线程挂起但是会挂起协程 ```kotlin import kotlinx.coroutines.* fun main() { GlobalScope.launch {//后台启动一个新的协程 delay(1000L)//非阻塞等待 println(“World!”) } println(“Hello,”) Thread.sleep(2000L)//阻塞主线程来保证JVM存活 }

fun main() { thread { Thread.sleep(1000L)//阻塞等待 println(“World!”) } println(“Hello”) Thread.sleep(2000L)

}

fun main() { GlobalScope.launch { delay(1000L) println(“world!”) } println(“Helllo,”) runBlocking { delay(2000L) }

}

fun main() = runBlocking { GlobalScope.launch { delay(1000L) println(“world!”) } println(“Helllo,”) delay(2000L) }

  1. - 延迟一段时间等待另一个协程运行并不是很好的选择,可以等待子协程执行完毕
  2. ```kotlin
  3. val job = GlobalScope.launch { // 启动一个新协程并保持对这个作业的引用
  4. delay(1000L)
  5. println("World!")
  6. }
  7. println("Hello,")
  8. job.join() // 等待直到子协程执行结束
  • 结构化并发,我们可以将main()函数转换为协程

    fun main() = runBlocking {
    lauch {
     delay(1000L)
     //
    }
    //
    }
    
  • 作用域构建器

    import kotlinx.corotinues.*
    fun main() = runBlocking {
    launch {
      delay(200L)
      println("Task from runBlocking")
    }
    coroutineScope {
      launch {
         delay(500L)
         println("Task from nested launch")
      }
    
      delay(100L)
      println("Task from coroutine scope")
    }
    println("Coroutine scope is over")
    }
    
  • 提取函数重构

    fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
    }
    suspend fun doWorld() {
    delay(1000L)
    println("World!")
    }
    
  • 协程是轻量级的

    import kotlinx.coroutines.*
    fun main() = runBlocking {
    repeat(100_000) { // 启动大量的协程
       launch {
           delay(5000L)
           print(".")
       }
    }
    }
    //以上代码用线程的方式将会出现内存不足的错误
    
  • 取消与超时
    • 取消协程的执行 ```kotlin import kotlinx.coroutines.* fun main() = runBlocking { val job = launch { repeat(1000) {i ->
        println("job: I'm sleeping $i ...")
        delay(500L)
      
      } } delay(1300L) println(“main: I’m tired of waiting!”) job.cancel() job.join() println(“main: Now I can quit.”) } //可以把cancel和join合成为cancelAndJoin() //计算代码是不可取消的例如while(),如果想要取消可以显示的检查取消状态例如while(isActive),也可以用yield取消

   -  超时
```kotlin
fun main() = runBlocking {
    withTimeout(1300L) {
       repeat(1000) { i->
          println("I'm sleeping $i ...")
          delay(500L)
       }
    }
} 
//可以通过使用withTimeoutOrNull返回null来代替抛出异常
  • async并发
    import kotlin.system.*
    import kotlin.coroutines.*
    fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
      val one = async {One()}
      val two = async {Two()}
      println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
    }
    suspend fun One(): Int {
    delay(1000L)
    return 13
    }
    suspend fun Two(): Int {
    delay(1000L)
    return 15
    }
    //惰性启动的async
    fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
      val one = async(start = CoroutineStart.LAZY) {One()}
      val two = async(staet = CoroutineStart.LAZY) {Two()}
      one.start()
      two.start()
      println("${one.await() + two.await()}")
    }
    println("Completed in $time ms")
    }
    
    ```kotlin import kotlinx.coroutines. import kotlin.system.

fun main() = runBlocking { val time = measureTimeMillis { println(“The answer is ${concurrentSum()}”) } println(“Completed in $time ms”) }

suspend fun concurrentSum(): Int = coroutineScope { val one = async {One()} val two = async {Two()} one.await() + two.await() } //如果在 concurrentSum 函数内部发生了错误,并且它抛出了一个异常, 所有在作用域中启动的协程都会被取消。

```kotlin
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
    try {
      failedConcurrentSum()
    } catch(e:ArithmeticException) {
      println("Computation failed with ArithmeticException")
    }
} 
suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
      try {
          delay(Long.MAX_VALUE)
          42
      } finally {
          println("First child was cancelled")
      }
    }
    val two = async<Int> {
      println("Second child throws an exception")
      throw ArithmeticException()
    }
    one.await() + two.await()
}

//todo 协程上下文与调度器

  • 异步流

    • 序列计算数字,阻塞

      fun simple():Sequence<Int> = sequence {
      for(i in 1..3) {
        Thread.sleep(100)
        yield(i)
      }
      }
      fun main() {
      simple().forEach { value -> println(value) }
      }
      
    • 挂起函数

      suspend fun simple(): List<Int> {
      delay(1000)
      return listOf(1,2,3)
      }
      fun main() = runBlocking<Unit> {
      simple().forEach { value -> printlnl(value) }
      }
      
    • fun simple(): Flow<Int> = flow {
      for (i in 1..3) {
        delay(100)
        emit(i)
      }
      }
      fun main() = runBlocking<Int> {
      launch {
       for(k in 1..3) {
          println("I'm not blocked $k")
       }
      }
      // 收集这个流
      simple().collect { value -> println(value) }
      }
      //取消
      withTimeoutOrNull(250) {
      simple().collect {value -> println(value)}
      }
      
    • 流构建器

      //flowOf()
      fun <T> flowOf(value: T):Flow<T>
      flowOf(1,2,3)
      //asFlow()
      (1..3).asFlow().Collect {value -> println(value)}
      
    • 过渡流

      suspend fun performRequest(request: Int): String {
      delay(1000)
      return "response $request"
      }
      fun main() = runBlocking<Unit> {
      (1..3).asFlow()
          .map { request -> performRequest(request) }
          .collect { response -> println(response) }
      }
      
    • 流取消

      fun main() = runBlocking {
      val channel = Channel<Int>()
      launch {
       for(x in 1..5) channel.send(x*x)
      }
      repeat(5) { println(channel.receive()) }
      println("Done")
      }
      
  • 通道

    val channel = Channel<Tnt>()
    launch {
     for(i in 1..5) channel.send(i*i)
     //channel.close()
    }
    repeat(5) {println(channel.receive())}
    println("Done!")
    
  • 管道 ```kotlin import kotlinx.coroutines. import kotlinx.coroutines.channels.

fun main() = runBlocking { val numbers = produceNumbers() val squares = square(numbers) repeat(5) { println(squares.receive()) } println(“Done!”) coroutineContext.cancelChildren() }

fun CoroutineScope.produceNumbers() = produce { var x = 1 while(true) send(x++) } fun CoroutineScope.square(numbers: ReceiveChannel): ReceiveChannel = produce { for(x in numbers) send(x*x) }


   -  多个协程也许会接收相同的管道
```kotlin
fun main() {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel()
}
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x=1
    while(true) {
       send(x++)
       delay(100)
    }
}
fun CoroutineScope.lauchProcessor(id:Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
       println("Processor #$id received $msg")
    }
}
  • 多个协程可以发送到同一个通道

    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR", 500L) }
    repeat(6) {
    println(channel.receive())
    }
    coroutineContext.cancelChildren()
    
  • 带缓冲的通道

    val channel = Channel<Int>(4)
    val sender = launch {
    repeat(10) {
      println("Sending $it")
      channel.send(it)
    }
    }
    delay(1000)
    sender.cancel()
    
  • 共享的可变状态与并发 ```kotlin import kotlin.coroutines. import kotlin.coroutines.channels.

suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println(“Completed ${n * k} actions in $time ms”) } sealed class CounterMsg object IncCounter : CounterMsg() class GetCounter(val response: CompletableDeferred) : CounterMsg()

fun CoroutineScope.counterActor() = actor { var counter = 0 for(msg in channel) { when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }

fun main() = runBlocking { val counter = counterActor() withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } val response = CompletableDeferred() counter.send(GetCounter(response)) println(“Counter = ${response.await()}”) counter.close() } ```