- 协程基础
- 示例程序,delay()是一个特殊的函数不会造成线程挂起但是会挂起协程 ```kotlin import kotlinx.coroutines.* fun main() { GlobalScope.launch {//后台启动一个新的协程 delay(1000L)//非阻塞等待 println(“World!”) } println(“Hello,”) Thread.sleep(2000L)//阻塞主线程来保证JVM存活 }
fun main() { thread { Thread.sleep(1000L)//阻塞等待 println(“World!”) } println(“Hello”) Thread.sleep(2000L)
}
fun main() { GlobalScope.launch { delay(1000L) println(“world!”) } println(“Helllo,”) runBlocking { delay(2000L) }
}
fun main() = runBlocking
- 延迟一段时间等待另一个协程运行并不是很好的选择,可以等待子协程执行完毕```kotlinval job = GlobalScope.launch { // 启动一个新协程并保持对这个作业的引用delay(1000L)println("World!")}println("Hello,")job.join() // 等待直到子协程执行结束
结构化并发,我们可以将main()函数转换为协程
fun main() = runBlocking { lauch { delay(1000L) // } // }作用域构建器
import kotlinx.corotinues.* fun main() = runBlocking { launch { delay(200L) println("Task from runBlocking") } coroutineScope { launch { delay(500L) println("Task from nested launch") } delay(100L) println("Task from coroutine scope") } println("Coroutine scope is over") }提取函数重构
fun main() = runBlocking { launch { doWorld() } println("Hello,") } suspend fun doWorld() { delay(1000L) println("World!") }协程是轻量级的
import kotlinx.coroutines.* fun main() = runBlocking { repeat(100_000) { // 启动大量的协程 launch { delay(5000L) print(".") } } } //以上代码用线程的方式将会出现内存不足的错误
- 取消与超时
- 取消协程的执行
```kotlin
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) {i ->
} } delay(1300L) println(“main: I’m tired of waiting!”) job.cancel() job.join() println(“main: Now I can quit.”) } //可以把cancel和join合成为cancelAndJoin() //计算代码是不可取消的例如while(),如果想要取消可以显示的检查取消状态例如while(isActive),也可以用yield取消println("job: I'm sleeping $i ...") delay(500L)
- 取消协程的执行
```kotlin
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) {i ->
- 超时
```kotlin
fun main() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
//可以通过使用withTimeoutOrNull返回null来代替抛出异常
- async并发
```kotlin import kotlinx.coroutines. import kotlin.system.import kotlin.system.* import kotlin.coroutines.* fun main() = runBlocking<Unit> { val time = measureTimeMillis { val one = async {One()} val two = async {Two()} println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } suspend fun One(): Int { delay(1000L) return 13 } suspend fun Two(): Int { delay(1000L) return 15 } //惰性启动的async fun main() = runBlocking<Unit> { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) {One()} val two = async(staet = CoroutineStart.LAZY) {Two()} one.start() two.start() println("${one.await() + two.await()}") } println("Completed in $time ms") }
fun main() = runBlocking
suspend fun concurrentSum(): Int = coroutineScope { val one = async {One()} val two = async {Two()} one.await() + two.await() } //如果在 concurrentSum 函数内部发生了错误,并且它抛出了一个异常, 所有在作用域中启动的协程都会被取消。
```kotlin
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch(e:ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE)
42
} finally {
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
//todo 协程上下文与调度器
异步流
序列计算数字,阻塞
fun simple():Sequence<Int> = sequence { for(i in 1..3) { Thread.sleep(100) yield(i) } } fun main() { simple().forEach { value -> println(value) } }挂起函数
suspend fun simple(): List<Int> { delay(1000) return listOf(1,2,3) } fun main() = runBlocking<Unit> { simple().forEach { value -> printlnl(value) } }流
fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Int> { launch { for(k in 1..3) { println("I'm not blocked $k") } } // 收集这个流 simple().collect { value -> println(value) } } //取消 withTimeoutOrNull(250) { simple().collect {value -> println(value)} }流构建器
//flowOf() fun <T> flowOf(value: T):Flow<T> flowOf(1,2,3) //asFlow() (1..3).asFlow().Collect {value -> println(value)}过渡流
suspend fun performRequest(request: Int): String { delay(1000) return "response $request" } fun main() = runBlocking<Unit> { (1..3).asFlow() .map { request -> performRequest(request) } .collect { response -> println(response) } }流取消
fun main() = runBlocking { val channel = Channel<Int>() launch { for(x in 1..5) channel.send(x*x) } repeat(5) { println(channel.receive()) } println("Done") }
通道
val channel = Channel<Tnt>() launch { for(i in 1..5) channel.send(i*i) //channel.close() } repeat(5) {println(channel.receive())} println("Done!")管道 ```kotlin import kotlinx.coroutines. import kotlinx.coroutines.channels.
fun main() = runBlocking { val numbers = produceNumbers() val squares = square(numbers) repeat(5) { println(squares.receive()) } println(“Done!”) coroutineContext.cancelChildren() }
fun CoroutineScope.produceNumbers() = produce
- 多个协程也许会接收相同的管道
```kotlin
fun main() {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel()
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x=1
while(true) {
send(x++)
delay(100)
}
}
fun CoroutineScope.lauchProcessor(id:Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
多个协程可以发送到同一个通道
val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR", 500L) } repeat(6) { println(channel.receive()) } coroutineContext.cancelChildren()带缓冲的通道
val channel = Channel<Int>(4) val sender = launch { repeat(10) { println("Sending $it") channel.send(it) } } delay(1000) sender.cancel()
- 共享的可变状态与并发 ```kotlin import kotlin.coroutines. import kotlin.coroutines.channels.
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println(“Completed ${n * k} actions in $time ms”)
}
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred
fun CoroutineScope.counterActor() = actor
fun main() = runBlocking
