可以使用集合在 Kotlin 中表示多个值。例如,我们可以有一个simple函数返回三个数字的列表,然后使用forEach 将它们全部打印出来:
fun simple(): List<Int> = listOf(1, 2, 3)fun main() {simple().forEach { value -> println(value) }}
Sequences
如果我们使用一些消耗 CPU 的阻塞代码来计算数字(每次计算需要 100 毫秒),那么我们可以使用Sequence表示数字:
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
此代码输出相同的数字,但在打印每个数字之前等待 100 毫秒。
挂起函数
但是,此计算会阻塞正在运行代码的主线程。当这些值由异步代码计算时,我们可以simple用suspend修饰符标记函数,以便它可以在不阻塞的情况下执行其工作并将结果作为列表返回:
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
Flow
使用List
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
此代码在打印每个数字之前等待 100 毫秒,而不会阻塞主线程。这是通过从主线程中运行的单独协程每 100 毫秒打印一次“我没有被阻塞”来验证的:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
请注意与前面示例中的Flow代码中的以下差异:
我们可以更换的延迟与Thread.sleep在体内simple的flow { … }和看到,主线程被阻塞,在这种情况下。
流的取消
Flow 遵循协程的一般协作取消。像往常一样,当流在可取消的挂起函数(如延迟)中被挂起时,可以取消流收集。以下示例显示在withTimeoutOrNull块中运行时流如何在超时时被取消并停止执行其代码
流所在的协程取消时流也会被取消
fun flowDemo() = flow<Int> {
for (i in 1..5) {
delay(1000)
emit(i )
println(i)
}
}
fun testflow() = runBlocking {
withTimeoutOrNull(2500) {
flowDemo().collect {
println(it)
}
println("finish")
}
}
为方便起见,流构建器对每个发出的值执行额外的ensureActive检查以取消。这意味着从 a 发出的繁忙循环flow { … }是可取消的:
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
但是,出于性能原因,大多数其他流操作符不会自行进行额外的取消检查。例如,如果您使用IntRange.asFlow扩展来编写相同的繁忙循环并且不在任何地方挂起,则不会检查取消:
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
使繁忙的流程可取消
一个带有协程的繁忙循环,您必须明确检查取消。你可以加
.onEach { currentCoroutineContext().ensureActive() },但是提供了一个现成的cancellable运算符来做到这一点:
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
缓冲buffer
从收集流所需的总时间的角度来看,在不同协程中运行流的不同部分可能会有所帮助,尤其是在涉及长时间运行的异步操作时。例如,考虑一个simple流的发射很慢的情况,需要 100 ms 来产生一个元素;并且收集器也很慢,需要 300 毫秒来处理一个元素。我们来看看收集这样一个三个数字的流需要多长时间:
buffer 与flowOn都有缓冲优化的作用,但是buffer不能切换上下文,也就是不能换线程
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
我们可以在流上使用缓冲区运算符来同时运行流的发射代码simple和收集代码,而不是顺序运行它们:
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
它生成相同的数字的速度更快,因为我们有效地创建了一个处理管道,只需等待 100 毫秒即可获得第一个数字,然后仅花费 300 毫秒来处理每个数字。这样运行大约需要 1000 毫秒:
合并、合流
当流表示操作的部分结果或操作状态更新时,可能不需要处理每个值,而是只处理最近的值。在这种情况下,当收集器太慢而无法处理中间值时,可以使用conflate运算符跳过中间值。基于上一个示例:
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
collectLatest
当发射极和集电极都很慢时,合并是一种加速处理的方法。它通过删除发出的值来实现。另一种方法是取消慢速收集器并在每次发出新值时重新启动它。有一系列xxxLatest运算符执行与运算符相同的基本逻辑xxx,但在新值上取消其块中的代码。让我们试着改变混为一谈到collectLatest在前面的例子:
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
zip
就像Kotlin 标准库中的Sequence.zip扩展函数一样,流有一个zip运算符,它组合了两个流的对应值:
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
1 -> one
2 -> two
3 -> three
背压

