可以使用集合在 Kotlin 中表示多个值。例如,我们可以有一个simple函数返回三个数字的列表,然后使用forEach 将它们全部打印出来:

  1. fun simple(): List<Int> = listOf(1, 2, 3)
  2. fun main() {
  3. simple().forEach { value -> println(value) }
  4. }

Sequences

如果我们使用一些消耗 CPU 的阻塞代码来计算数字(每次计算需要 100 毫秒),那么我们可以使用Sequence表示数字:

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

此代码输出相同的数字,但在打印每个数字之前等待 100 毫秒。

挂起函数

但是,此计算会阻塞正在运行代码的主线程。当这些值由异步代码计算时,我们可以simple用suspend修饰符标记函数,以便它可以在不阻塞的情况下执行其工作并将结果作为列表返回:

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

Flow

使用List结果类型,意味着我们一次只能返回所有值。为了表示被异步计算的值流,我们可以使用一个Flow类型,就像我们使用Sequence同步计算值的类型一样:

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

此代码在打印每个数字之前等待 100 毫秒,而不会阻塞主线程。这是通过从主线程中运行的单独协程每 100 毫秒打印一次“我没有被阻塞”来验证的:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

请注意与前面示例中的Flow代码中的以下差异:

  • Flow类型的构建器函数称为flow
  • flow { … }构建器块内的代码可以挂起。
  • 该simple功能不再标有suspend修饰符。
  • 发射从使用流EMIT功能。
  • 值是收集来自使用流收集功能。

我们可以更换的延迟与Thread.sleep在体内simple的flow { … }和看到,主线程被阻塞,在这种情况下。

流的取消

Flow 遵循协程的一般协作取消。像往常一样,当流在可取消的挂起函数(如延迟)中被挂起时,可以取消流收集。以下示例显示在withTimeoutOrNull块中运行时流如何在超时时被取消并停止执行其代码

流所在的协程取消时流也会被取消

fun flowDemo() = flow<Int> {
    for (i in 1..5) {
        delay(1000)
        emit(i )
        println(i)
    }
}

fun testflow() = runBlocking {
    withTimeoutOrNull(2500) {
        flowDemo().collect {
            println(it)
        }
        println("finish")
    }
}

为方便起见,构建器对每个发出的值执行额外的ensureActive检查以取消。这意味着从 a 发出的繁忙循环flow { … }是可取消的:

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

但是,出于性能原因,大多数其他流操作符不会自行进行额外的取消检查。例如,如果您使用IntRange.asFlow扩展来编写相同的繁忙循环并且不在任何地方挂起,则不会检查取消:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

使繁忙的流程可取消

一个带有协程的繁忙循环,您必须明确检查取消。你可以加
.onEach { currentCoroutineContext().ensureActive() },但是提供了一个现成的cancellable运算符来做到这一点:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

缓冲buffer

从收集流所需的总时间的角度来看,在不同协程中运行流的不同部分可能会有所帮助,尤其是在涉及长时间运行的异步操作时。例如,考虑一个simple流的发射很慢的情况,需要 100 ms 来产生一个元素;并且收集器也很慢,需要 300 毫秒来处理一个元素。我们来看看收集这样一个三个数字的流需要多长时间:

buffer 与flowOn都有缓冲优化的作用,但是buffer不能切换上下文,也就是不能换线程

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

我们可以在流上使用缓冲区运算符来同时运行流的发射代码simple和收集代码,而不是顺序运行它们:

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

它生成相同的数字的速度更快,因为我们有效地创建了一个处理管道,只需等待 100 毫秒即可获得第一个数字,然后仅花费 300 毫秒来处理每个数字。这样运行大约需要 1000 毫秒:

合并、合流

当流表示操作的部分结果或操作状态更新时,可能不需要处理每个值,而是只处理最近的值。在这种情况下,当收集器太慢而无法处理中间值时,可以使用conflate运算符跳过中间值。基于上一个示例:

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

collectLatest

当发射极和集电极都很慢时,合并是一种加速处理的方法。它通过删除发出的值来实现。另一种方法是取消慢速收集器并在每次发出新值时重新启动它。有一系列xxxLatest运算符执行与运算符相同的基本逻辑xxx,但在新值上取消其块中的代码。让我们试着改变混为一谈collectLatest在前面的例子:

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

zip

就像Kotlin 标准库中的Sequence.zip扩展函数一样,流有一个zip运算符,它组合了两个流的对应值:

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print


1 -> one
2 -> two
3 -> three

背压

image.png