篇章目的

  • 了解 flow:基础用法
  • 了解冷热流
  • 三个 Flow 的使用与区别
  • 想要了解更多,转移官方教程

前提知识:

Flow 定义与用法


Flow 是什么

Flow 的官方( Kotlin )定义是一个异步的数据流,它按照顺序发出值。
数据流大家都应该挺熟的,例如在编程里我们读取一个文件,就需要用到流。而 Flow 呢是一个被协程客制化的数据流,在数据流动的过程中是可以被挂起的。

在 Kotlin 中 Flow 的写法很简单,通过 flow 就可以定义,例如下:

  1. val myFlow = flow<Any> {
  2. }

简单分析下上边的代码,myFlow 是这个流的实例,flow 是创建流的函数,而泛型规定了这个流里能发什么数据,最后那个 Lambda 就是控制留的区域,在这个区域里可以发送数据,处理数据,挂起操作等。

Flow 怎么用

刚刚说了 flow 怎么定义了,下面我们用 flow 形式的代码模拟一个视频流

  1. class VideoData(val data: String)
  2. fun flowTest() = runBlocking<Unit> {
  3. val videoFlow = flow {
  4. emit(VideoData("播放片头曲"))
  5. delay(1000)
  6. emit(VideoData("遇见怪兽"))
  7. delay(1000)
  8. emit(VideoData("你相信光吗"))
  9. delay(1000)
  10. emit(VideoData("打败怪兽"))
  11. delay(1000)
  12. emit(VideoData("播放片尾曲"))
  13. }
  14. }

上边的代码,我们通过 flow 模拟了一个视频流的播放过程,你可以把 delay 想象成在拖动视频。那么定义好的 Flow 该如何去使用呢,就好比上边的例子,得需要一个播放器去接收播放。

接收流需要用到了 Flow 接口提供的 collect 函数,这个函数是个阻塞函数,只有一个 Lambda 的参数用来返回流,例子如下:

  1. fun flowTest() = runBlocking<Unit> {
  2. val videoFlow = flow {
  3. emit(VideoData("播放片头曲"))
  4. delay(1000)
  5. emit(VideoData("遇见怪兽"))
  6. delay(1000)
  7. emit(VideoData("你相信光吗"))
  8. delay(1000)
  9. emit(VideoData("打败怪兽"))
  10. delay(1000)
  11. emit(VideoData("播放片尾曲"))
  12. }
  13. launch {
  14. delay(2000)
  15. println("等待 2s 后")
  16. println("-- 播放器 A 开始播放视频--")
  17. videoFlow.collect {
  18. println("播放器 A 播放:${it.data}")
  19. }
  20. println("-- 播放器 A 结束播放视频--")
  21. }
  22. }
  23. // 输出
  24. /**
  25. -- 播放器 A 开始播放视频--
  26. 播放器 A 播放:播放片头曲
  27. 播放器 A 播放:遇见怪兽
  28. 播放器 A 播放:你相信光吗
  29. 播放器 A 播放:打败怪兽
  30. 播放器 A 播放:播放片尾曲
  31. -- 播放器 A 结束播放视频--
  32. ** /

以上就是最基本的 Flow 的使用,简单总结下:

  • 创建:使用 **flow** 函数,在 **collect** 时启动执行 flow 内代码块
  • 发送:使用 **emit** 函数
  • 接收:使用 Flow 的 **collect** 函数
  • 取消:取消执行 **collect** 时的协程 Job 或作用域

冷流和热流


刚才已经介绍了 Flow 的基础使用,这里说下 Flow 的特性,Flow 是冷流,且只能有一个观察(接收)者。那冷流是什么,热流又是什么,它们的定义如下:

  • 冷流无消费者时则不会生产数据,只有一个观察者
  • 热流无观察者时也生产数据,可以被多个观察者监听

定义如果有点晦涩,我们一个一个来说,首先要说下冷热流有观察者和无观察者的区别。冷流时无观察者时不会产生数据,热流是不管有没有观察者都会产生数据。

有无观察者冷热流区别

定义说的很明白了,需要用代码去验证下,先是冷流

  1. fun flowTest() = runBlocking<Unit> {
  2. val videoFlow = flow {
  3. emit(VideoData("播放片头曲"))
  4. delay(1000)
  5. emit(VideoData("遇见怪兽"))
  6. delay(1000)
  7. emit(VideoData("你相信光吗"))
  8. delay(1000)
  9. emit(VideoData("打败怪兽"))
  10. delay(1000)
  11. emit(VideoData("播放片尾曲"))
  12. }
  13. launch {
  14. delay(1500)
  15. println("等待 1.5s 后")
  16. println("-- 播放器 A 开始播放视频--")
  17. videoFlow.collect {
  18. println("播放器 A 播放:${it.data}")
  19. }
  20. println("-- 播放器 A 结束播放视频--")
  21. }
  22. }

这个例子其实就加了个 delay(1000) 的实际代码,这已经够验证 Flow 是个冷流了
冷流的定义是,只有在消费(监听)时,才会发送数据,而在这个例子中消费就是 collect 函数。所以说增加的 delay 函数并不会对原有输出有任何影响,运行验证结果输出如下:

  1. /**
  2. 等待 1.5s 后
  3. -- 播放器 A 开始播放视频--
  4. 播放器 A 播放:播放片头曲
  5. 播放器 A 播放:遇见怪兽
  6. 播放器 A 播放:你相信光吗
  7. 播放器 A 播放:打败怪兽
  8. 播放器 A 播放:播放片尾曲
  9. -- 播放器 A 结束播放视频--
  10. ** /

通过上边的输出,我们就能看到,在延迟了 1.5s 后流依旧是重头播放的,接下来我们用热流试一下结果:(转换为热流的代码稍后就会说到,这里先理解)

  1. fun flowTest() = runBlocking<Unit> {
  2. val videoFlow = flow {
  3. emit(VideoData("播放片头曲"))
  4. delay(1000)
  5. emit(VideoData("遇见怪兽"))
  6. delay(1000)
  7. emit(VideoData("你相信光吗"))
  8. delay(1000)
  9. emit(VideoData("打败怪兽"))
  10. delay(1000)
  11. emit(VideoData("播放片尾曲"))
  12. }.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)
  13. launch {
  14. delay(1500)
  15. println("等待 1.5s 后")
  16. println("-- 播放器 A 开始播放视频--")
  17. videoFlow.collect {
  18. println("播放器 A 播放:${it.data}")
  19. }
  20. println("-- 播放器 A 结束播放视频--")
  21. }
  22. }
  23. // 输出
  24. /**
  25. 等待 1.5s 后
  26. -- 播放器 A 开始播放视频--
  27. 播放器 A 播放:你相信光吗
  28. 播放器 A 播放:打败怪兽
  29. 播放器 A 播放:播放片尾曲
  30. ** /

通过这个结果可以看到热流的一些特性,在等待 1.5s 的期间,videoFlow 已经发送了 播放片头曲遇见怪兽 这两个数据,在监听的时候只能监听到后面发送的数据,证明流在监听前已经启动了。

一个和多个观察者冷热流区别

冷流只能有一个观察者,这个我思考了挺久了,因为我用代码是可以 collect 多次的,例如下面:

  1. val videoFlow = flow {...}
  2. launch {
  3. println("-- 播放器 A 开始播放视频--")
  4. videoFlow.collect {
  5. println("播放器 A 播放:${it.data}")
  6. }
  7. println("-- 播放器 A 结束播放视频--")
  8. }
  9. launch {
  10. println("-- 播放器 B 开始播放视频--")
  11. videoFlow.collect {
  12. println("播放器 B 播放:${it.data}")
  13. }
  14. println("-- 播放器 B 结束播放视频--")
  15. }

后面我想明白了,观察者观察的是数据,所以上上边的话转变一下:

  • 冷流:一个数据只能被一个观察者观察,而多个观察者会产生多个数据
  • 热流:一个数据能被多个观察者观察,多个观察者也只会有一份数据

那么我如何通过代码验证呢,可以用 hash 值,同一个数据的 hash 值是相同的,知道了这点我们改造下代码,把 hash 值加上,首先先看冷流:

  1. fun flowTest() = runBlocking<Unit> {
  2. val videoFlow = flow {
  3. emit(VideoData("播放片头曲"))
  4. delay(1000)
  5. emit(VideoData("遇见怪兽"))
  6. delay(1000)
  7. emit(VideoData("你相信光吗"))
  8. delay(1000)
  9. emit(VideoData("打败怪兽"))
  10. delay(1000)
  11. emit(VideoData("播放片尾曲"))
  12. }
  13. launch {
  14. println("-- 播放器 A 开始播放视频--")
  15. videoFlow.collect {
  16. println("播放器 A 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")
  17. }
  18. println("-- 播放器 A 结束播放视频--")
  19. }
  20. launch {
  21. delay(100)
  22. println("-- 播放器 B 开始播放视频--")
  23. videoFlow.collect {
  24. println("播放器 B 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")
  25. }
  26. println("-- 播放器 B 结束播放视频--")
  27. }
  28. }

在代码中,加上了 hashCode 的值,接下来看下输出

  1. -- 播放器 A 开始播放视频--
  2. 播放器 A 播放:播放片头曲 【数据唯一标识:472654579
  3. -- 播放器 B 开始播放视频--
  4. 播放器 B 播放:播放片头曲 【数据唯一标识:1053782781
  5. 播放器 A 播放:遇见怪兽 【数据唯一标识:846063400
  6. 播放器 B 播放:遇见怪兽 【数据唯一标识:627150481
  7. 播放器 A 播放:你相信光吗 【数据唯一标识:128526626
  8. 播放器 B 播放:你相信光吗 【数据唯一标识:1911728085
  9. 播放器 A 播放:打败怪兽 【数据唯一标识:754666084
  10. 播放器 B 播放:打败怪兽 【数据唯一标识:88558700
  11. 播放器 A 播放:播放片尾曲 【数据唯一标识:1265210847
  12. -- 播放器 A 结束播放视频--
  13. 播放器 B 播放:播放片尾曲 【数据唯一标识:801197928
  14. -- 播放器 B 结束播放视频--

可以看到虽然 A 和 B 播放器都观察到了videoFlow的变化,但是两个播放器收到的是完全不同的数据,这时我们再用 .shareIn() 函数将 videoFlow 转换为热流试一下输出

  1. -- 播放器 A 开始播放视频--
  2. 播放器 A 播放:播放片头曲 【数据唯一标识:687241927
  3. -- 播放器 B 开始播放视频--
  4. 播放器 A 播放:遇见怪兽 【数据唯一标识:2096171631
  5. 播放器 B 播放:遇见怪兽 【数据唯一标识:2096171631
  6. 播放器 A 播放:你相信光吗 【数据唯一标识:2114694065
  7. 播放器 B 播放:你相信光吗 【数据唯一标识:2114694065
  8. 播放器 A 播放:打败怪兽 【数据唯一标识:1844169442
  9. 播放器 B 播放:打败怪兽 【数据唯一标识:1844169442
  10. 播放器 A 播放:播放片尾曲 【数据唯一标识:1537358694
  11. 播放器 B 播放:播放片尾曲 【数据唯一标识:1537358694

改成热流后 hashcode 值一致了,证明热流接收的是相同的数据。
上边的例子也可以映射到现实去联想下,辅助下记忆

  • 冷流:视频网站上的视频,每个观看的用户都是单独的
  • 热流:视频网站上的直播,每个观看的用户都是共享的

SharedFlow


刚刚上边说了 Flow 和冷热流的概念,并且验证了 Flow 是个冷流,现在来讲下热流。
SharedFlow 是 Flow 的一个子集,是一种热流的实现。Mutable 前缀我想都应该不会陌生,是可编辑的意思例如 MutableMap 和 MutableList 等,SharedFlow 在创建后是不能再发送的数据的,MutableSharedFlow 则可以。

SharedFlow 有如下两种创建方式:

  1. // SharedFlow 创建方式一
  2. val flowIn = flow<Int> {
  3. ...
  4. }.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)
  5. // SharedFlow 创建方式二
  6. val mutableSharedFlow = MutableSharedFlow<Int>()
  7. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  8. mutableSharedFlow.emit(...)

基本使用

MutableSharedFlow 它的基本使用还是很简单的,代码如下:

  1. fun sharedFlowBaseTest() = runBlocking<Unit> {
  2. // SharedFlow 创建方式二
  3. val mutableSharedFlow = MutableSharedFlow<Int>()
  4. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  5. launch {
  6. delay(1000)
  7. mutableSharedFlow.emit(1)
  8. delay(1000)
  9. mutableSharedFlow.emit(2)
  10. delay(1000)
  11. mutableSharedFlow.emit(3)
  12. }
  13. val collectJob = launch {
  14. sharedFlow.collect {
  15. println("收到:${it}")
  16. }
  17. println("collectJob finish")
  18. }
  19. launch {
  20. sharedFlow.collect {
  21. println("收到:${it}")
  22. }
  23. }
  24. launch {
  25. delay(2100)
  26. collectJob.cancel()
  27. }
  28. }

因为是 Flow 的子集,所以 emit 和 collect 函数使用方式是一致的,值得注意的有如下三点:

  • MutableSharedFlow 没有初始值
  • 跟 Flow 不同,SharedFlow 的 collect 是个持续阻断的函数(17 行代码永远不会执行)
  • SharedFlow 是可以取消的,但是它本身没有取消方法,需要协程 Job 或者协程作用域取消

MutableSharedFlow 使用

刚刚的基本使用在创建 MutableSharedFlow 是没有任何定制化的都是用的默认参数,其实 MutableSharedFlow 是个很灵活的 Flow,通过一些简单的配置就可以实现缓冲、黏性等业务场景。

先看下 MutableSharedFlow 的初始化方法签名:

  1. public fun <T> MutableSharedFlow(
  2. replay: Int = 0,
  3. extraBufferCapacity: Int = 0,
  4. onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
  5. ): MutableSharedFlow<T>

其中的三个参数分别的意义为:

  • replay:重发个数,当有新的观察者 collect 时,发送几个已经发送过的数据给它;默认为 0 个
  • extraBufferCapacity:缓存个数,减去 replay,MutableSharedFlow 还缓存多少数据;默认为 0 个
  • onBufferOverflow:到达缓存个数时,缓存的策略;默认为 SUSPEND
    • BufferOverflow.SUSPEND: 挂起后续缓存的值
    • BufferOverflow.DROP_OLDEST: 丢掉最旧值
    • BufferOverflow.DROP_LATEST: 丢掉最新值

接下来会根据顺序挨个去实践下每个参数的行为

replay

定义:当有新的观察者 collect 时,发送几个已经发送过的数据给它,默认为 0 个

例子:

  1. fun sharedFlowReplayTest() = runBlocking {
  2. val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)
  3. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  4. launch {
  5. delay(1000)
  6. mutableSharedFlow.emit(1)
  7. delay(1000)
  8. mutableSharedFlow.emit(2)
  9. delay(1000)
  10. mutableSharedFlow.emit(3)
  11. }
  12. launch {
  13. delay(2100)
  14. sharedFlow.collect {
  15. println("收到:${it}")
  16. }
  17. }
  18. }

定义说的很明确,就是重发的个数,在这个例子里 replay = 2 也就是重发 2 个,所以在延迟 2s 后依然会接收到已经发送过的 1、2的值。

onBufferOverflow

缓存满了后,缓存的策略

  • SUSPEND::挂起
  • DROP_OLDEST:丢弃旧的值,只接收最新的
  • DROP_LATEST:丢弃新的值,只接收最旧的

再说这个参数之前,要先讲一个概念,叫 背压
背压:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

举个现实中的例子:
上游:一个厨师做馒头特别快,0.1秒就可以做出一个馒头
下游:顾客吃馒头的速度有限,0.5 秒钟才能吃 1 个
顾客吃第一个馒头的时候回头一看,我去这可不行啊,我压力太大了得告诉厨师 慢点做/后面我只吃最新的馒头 /后面我只吃第下一个生产出来的馒头/。这个行为就叫做背压。

说完了背压的概念,结合上边的例子我们再看下 onBufferOverflow 这个参数

  • SUSPEND:您慢点做,我吃一个您做一个
  • DROP_OLDEST:后面我只吃最新的馒头
  • DROP_LATEST:后面我只吃第下一个生产出来的馒头

现在理论已经出来了剩下的就是实践,下面会根据现实中的例子写个实例项目

DROP_OLDEST 例子

  1. fun sharedFlowBufferOverflowTest() = runBlocking {
  2. val mutableSharedFlow =
  3. MutableSharedFlow<Int>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
  4. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  5. launch {
  6. println("<---顾客来到店里,等待厨师生产馒头")
  7. sharedFlow.collect {
  8. println("<---顾客收到馒头 $it")
  9. delay(500)
  10. println("<---顾客吃完馒头 $it")
  11. }
  12. }
  13. launch {
  14. for (i in 1..6) {
  15. delay(100)
  16. println("厨师生产了馒头 $i")
  17. mutableSharedFlow.emit(i)
  18. }
  19. }
  20. }

输出:

  1. <---顾客来到店里,等待厨师生产馒头
  2. 厨师生产了馒头 1
  3. <---顾客收到馒头 1
  4. 厨师生产了馒头 2
  5. 厨师生产了馒头 3
  6. 厨师生产了馒头 4
  7. 厨师生产了馒头 5
  8. <---顾客吃完馒头 1
  9. <---顾客收到馒头 5
  10. 厨师生产了馒头 6
  11. <---顾客吃完馒头 5
  12. <---顾客收到馒头 6
  13. <---顾客吃完馒头 6

如果对这个输出有疑惑的话,我会挨个解释下,如果没有疑惑,直接看下个示例就好

输出解释:

  1. <—-顾客来到店里,等待厨师生产馒头:这个没啥说的,就是个单纯的输出,代表观察者已经就位
  2. 厨师生产了馒头 1:生产者生产了第一条数据
  3. <—-顾客收到馒头 1:观察者收到第一条数据
  4. 厨师生产了馒头 2:这行要注意下,观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
  5. 厨师生产了馒头 3:由于生产者设置的是DROP_LATEST(只要最新数据),所以会把 ‘馒头 2’从缓存里删掉,并且将 ‘馒头 3’ 放入缓存。
  6. 厨师生产了馒头 4:由于生产者设置的是DROP_LATEST(只要最新数据),所以会把 ‘馒头 3’从缓存里删掉,并且将 ‘馒头 4’ 放入缓存。
  7. 厨师生产了馒头 5:由于生产者设置的是DROP_LATEST(只要最新数据),所以会把 ‘馒头 4’从缓存里删掉,并且将 ‘馒头 5’ 放入缓存。
  8. <—-顾客吃完馒头 1:观察者消费完了数据
  9. <—-顾客收到馒头 5:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 5’,所以观察者收到‘馒头 5’,并且将缓存数据清空
  10. 厨师生产了馒头 6:观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
  11. <—-顾客吃完馒头 5:观察者消费完了数据
  12. <—-顾客收到馒头 6:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 6’,所以观察者收到‘馒头 6’,并且将缓存数据清空
  13. <—-顾客吃完馒头 6:观察者消费完了数据

DROP_LATEST 例子

  1. fun sharedFlowBufferOverflowTest() = runBlocking {
  2. val mutableSharedFlow =
  3. MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
  4. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  5. launch {
  6. println("<---顾客来到店里,等待厨师生产馒头")
  7. sharedFlow.collect {
  8. println("<---顾客收到馒头 $it")
  9. delay(500)
  10. println("<---顾客吃完馒头 $it")
  11. }
  12. }
  13. launch {
  14. for (i in 1..6) {
  15. delay(100)
  16. println("厨师生产了馒头 $i")
  17. mutableSharedFlow.emit(i)
  18. }
  19. }
  20. }

�输出:

  1. <---顾客来到店里,等待厨师生产馒头
  2. 厨师生产了馒头 1
  3. <---顾客收到馒头 1
  4. 厨师生产了馒头 2
  5. 厨师生产了馒头 3
  6. 厨师生产了馒头 4
  7. 厨师生产了馒头 5
  8. <---顾客吃完馒头 1
  9. <---顾客收到馒头 2
  10. 厨师生产了馒头 6
  11. <---顾客吃完馒头 2
  12. <---顾客收到馒头 6
  13. <---顾客吃完馒头 6

这个输出的结果我就不分析了,其实跟上边类似,如果想不明白问我。

SUSPEND 例子

  1. fun sharedFlowBufferOverflowTest() = runBlocking {
  2. val mutableSharedFlow =
  3. MutableSharedFlow<Int>(extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
  4. val sharedFlow: SharedFlow<Int> = mutableSharedFlow
  5. val startTime = System.currentTimeMillis()
  6. val finishTime = {
  7. val endTime = System.currentTimeMillis()
  8. " 【持续时间:"+((endTime - startTime)/100 * 100).toString()+"】"
  9. }
  10. launch {
  11. println("<---顾客来到店里,等待厨师生产馒头")
  12. sharedFlow.collect {
  13. println("<---顾客收到馒头 $it ${finishTime()}")
  14. delay(500)
  15. println("<---顾客吃完馒头 $it ${finishTime()}")
  16. }
  17. }
  18. launch {
  19. for (i in 1..6) {
  20. delay(100)
  21. println("厨师生产了馒头 $i ${finishTime()}")
  22. mutableSharedFlow.emit(i)
  23. }
  24. }
  25. }

这个例子较上两个稍微改了一点。首先 extraBufferCapacity 参数设置为 0 了,这个为了方便分析,其次输出里增加了持续时间这个字段,也是为了方便分析的。输出如下:

  1. <---顾客来到店里,等待厨师生产馒头
  2. 厨师生产了馒头 1 【持续时间:100】
  3. <---顾客收到馒头 1 【持续时间:100】
  4. 厨师生产了馒头 2 【持续时间:200】
  5. <---顾客吃完馒头 1 【持续时间:600】
  6. <---顾客收到馒头 2 【持续时间:600】
  7. 厨师生产了馒头 3 【持续时间:700】
  8. <---顾客吃完馒头 2 【持续时间:1100】
  9. <---顾客收到馒头 3 【持续时间:1100】
  10. 厨师生产了馒头 4 【持续时间:1200】
  11. <---顾客吃完馒头 3 【持续时间:1600】
  12. <---顾客收到馒头 4 【持续时间:1600】
  13. 厨师生产了馒头 5 【持续时间:1700】
  14. <---顾客吃完馒头 4 【持续时间:2100】
  15. <---顾客收到馒头 5 【持续时间:2100】
  16. 厨师生产了馒头 6 【持续时间:2200】
  17. <---顾客吃完馒头 5 【持续时间:2600】
  18. <---顾客收到馒头 6 【持续时间:2600】
  19. <---顾客吃完馒头 6 【持续时间:3100】

通过这个输出可以看到,这个缓存策略没有丢弃任何数值,后续的都被挂起了
分析输出的话会说一个关键点就可以了,如下:

输出第4行-第 7 行:在第 4 行的时候生产了‘馒头 2’持续时间是 200 毫秒(厨师 100 毫秒生产一个馒头),而在第 7 行输出里才生产了‘馒头 3’,中间差了 500 毫秒,也就说生产者被挂起了,注意是生产者被挂起,不是观察者

这里映射现实就是,顾客跟厨师说,您等我吃完再做下一个

extraBufferCapacity

缓存个数
其实这个在说完了 onBufferOverflow 参数后就已经很好理解了,就是缓存的个数,你可以去自行更改下上边三个中的缓存个数的数值,然后运行去分析下输出

shareIn 函数转换

转换的代码很简单,使用 shareIn 函数

  1. public fun <T> Flow<T>.shareIn(
  2. scope: CoroutineScope,
  3. started: SharingStarted,
  4. replay: Int = 0
  5. ): SharedFlow<T>

第三个参数不用说了,前两个参数会在下面说下

flow 函数的协程作用域

flow 函数的协程作用域是由 collect 时的上级作用域决定的。
由于 flow 是冷流,所以可以有多个作用域。

下面是个测试:

  1. fun sharedFlowShareInTest() = runBlocking {
  2. val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
  3. val flow = flow {
  4. for (i in 1..6) {
  5. delay(100)
  6. println("<---发送:$i " + Thread.currentThread().name)
  7. emit(i)
  8. }
  9. }
  10. launch {
  11. flow.collect {
  12. println("接收 A: $it " + Thread.currentThread().name)
  13. }
  14. }
  15. scope.launch {
  16. flow.collect {
  17. println("接收 B: $it " + Thread.currentThread().name)
  18. }
  19. }
  20. }

输出:

  1. <---发送:1 pool-1-thread-1
  2. <---发送:1 main
  3. 接收 B: 1 pool-1-thread-1
  4. 接收 A: 1 main
  5. <---发送:2 main
  6. <---发送:2 pool-1-thread-1
  7. 接收 B: 2 pool-1-thread-1
  8. 接收 A: 2 main
  9. <---发送:3 main
  10. 接收 A: 3 main
  11. <---发送:3 pool-1-thread-1
  12. 接收 B: 3 pool-1-thread-1
  13. <---发送:4 main
  14. <---发送:4 pool-1-thread-1
  15. 接收 B: 4 pool-1-thread-1
  16. 接收 A: 4 main
  17. <---发送:5 main
  18. <---发送:5 pool-1-thread-1
  19. 接收 A: 5 main
  20. 接收 B: 5 pool-1-thread-1
  21. <---发送:6 main
  22. <---发送:6 pool-1-thread-1
  23. 接收 A: 6 main
  24. 接收 B: 6 pool-1-thread-1

可以看到在 flow 函数的 lambda 表达式中,是输出了两个不同的线程的。

shareIn 的 scope 参数

flow 是冷流,每个观察者都配有独立的生产者,所以作用域可以有多个。
但是 sharedFlow 是热流,生产者只有一个,那么就会产生一个问题,flow 函数的 lambda 表达式中的作用域到底该是什么。

而 scope 这个参数就是解决此问题的,作用域统一由 shareIn 的 scope 参数指定。
下面将上边例子中的 flow 通过 shareIn 转换下

  1. val flow = flow {
  2. for (i in 1..6) {
  3. delay(100)
  4. println("<---发送:$i " + Thread.currentThread().name)
  5. emit(i)
  6. }
  7. }.shareIn(scope, SharingStarted.Lazily, 0)

再看下输出

  1. <---发送:1 pool-1-thread-1
  2. 接收 A: 1 main
  3. 接收 B: 1 pool-1-thread-1
  4. <---发送:2 pool-1-thread-1
  5. 接收 A: 2 main
  6. 接收 B: 2 pool-1-thread-1
  7. <---发送:3 pool-1-thread-1
  8. 接收 A: 3 main
  9. 接收 B: 3 pool-1-thread-1
  10. <---发送:4 pool-1-thread-1
  11. 接收 A: 4 main
  12. 接收 B: 4 pool-1-thread-1
  13. <---发送:5 pool-1-thread-1
  14. 接收 A: 5 main
  15. 接收 B: 5 pool-1-thread-1
  16. <---发送:6 pool-1-thread-1
  17. 接收 A: 6 main
  18. 接收 B: 6 pool-1-thread-1

可以看到生产者的协程作用域已经是 shareIn 传入的。

shareIn 的 started 参数

  • Lazily:当首个观察者出现时,启动 flow
  • Eagerly:立即启动 flow
  • WhileSubscribed:自定义策略
    • stopTimeoutMillis:如果没有观察者时,多长时间取消生产者数据
    • replayExpirationMilis:多长时间取消 replay

Lazily 和 Eagerly 这两个参数没啥好说的,都很简单。

主要看下 WhileSubscribed 这个函数。
我们通过具体的业务场景可以更好的理解这个参数的意义:

我们在 App 的数据层里有一个 shareFlow 是每 2 秒获取手机的 GPS 信息,然后 emit 出去。
为了用户的省电考虑,我希望在用户息屏的时候,就不要再获取了,然后亮屏的时候,在重新启动 2s 轮训的获取
这个需求就可以直接用 WhileSubscribed 实现,来看下:

  1. fun sharedFlowShareInTest() = runBlocking {
  2. val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
  3. val flow = flow {
  4. while (true) {
  5. val uuid = UUID.randomUUID()
  6. println("获取 GPS $uuid")
  7. emit(uuid)
  8. delay(2000)
  9. }
  10. }.shareIn(scope, SharingStarted.WhileSubscribed(stopTimeoutMillis = 0), 0)
  11. val create =
  12. {
  13. launch {
  14. flow.collect {
  15. println("观察者 : $it ")
  16. }
  17. }
  18. }
  19. val job = create()
  20. launch {
  21. delay(3000)
  22. println("息屏了")
  23. job.cancel()
  24. }
  25. launch {
  26. delay(15000)
  27. println("亮屏了")
  28. create()
  29. }
  30. }

在这个例子,flow 模拟了每 2 秒获取的 GPS 信息的数据源,在 3 秒后会取消观察者模拟息屏,在 15 秒过后再次创建一个观察者来模拟亮屏。
输出我就不展示了,自己运行下看下。在输出息屏了后立马就停止了 GPS 的轮询,然后在 12 秒后 GPS 的轮询又会启动了。

刚刚上边的例子 stopTimeoutMillis 用的 0,也就是说没有观察者时立刻停止了生产者的生产。通过 stopTimeoutMillis 这个参数也可以设置指定的缓冲时间,例如我想没有观察者时 1 秒后再停止生产者生产,那就设置为 1000

第二个参数 replayExpirationMilis 就比较简单了,replay 多长时间过期。因为上边已经说了 replay 参数了,而 replayExpirationMilis 阐述的也比较直观,所以就不写例子了。

StateFlow


StateFlow 是 SharedFlow 的一个子集,同样是热流。

相比于 SharedFlow,StateFlow 定制了如下特性

  • 始终有值
  • 粘性:始终接收最新值
  • 防抖(去重):重复的数据不会连续发送
  • 可以通过 value 的方式获取最新的值(不用从 collect 中获取)

但是 StateFlow 也失去了 SharedFlow 的灵活性没有了 replay、onBufferOverflow 等有用的特性,这个等下一节对比使用场景的时候具体说。此节主要说下基本使用

  1. // StateFlow 创建方式一
  2. val flowIn2 = flow<Int> {
  3. ...
  4. }.stateIn(CoroutineScope(Dispatchers.IO), started = SharingStarted.Lazily, 1)
  5. // StateFlow 创建方式二
  6. val mutableStateFlow = MutableStateFlow(1)
  7. val stateFlow: StateFlow<Int> = mutableStateFlow

MutableStateFlow 使用

相比于 MutableSharedFlow 来说 MutableStateFlow 的构造函数要简单多了,就一个参数,签名如下:

  1. public fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

这个 value 参数没啥好说的,就是初始值。因为 StateFlow 的特性是始终有值,所以必须有个初始值。

接下来,我们测试下 StateFlow 受到背压后会采用什么处理方式,例子就把 onBufferOverflow 这一小结的例子稍微改下,如下:

  1. fun stateFlowTest() = runBlocking {
  2. val mutableStateFlow = MutableStateFlow(0)
  3. val stateFlow: StateFlow<Int> = mutableStateFlow
  4. launch {
  5. println("<---顾客来到店里,等待厨师生产馒头")
  6. stateFlow.collect {
  7. if (it == 0) return@collect
  8. println("<---顾客收到馒头 $it")
  9. delay(500)
  10. println("<---顾客吃完馒头 $it")
  11. }
  12. }
  13. launch {
  14. for (i in 1..6) {
  15. delay(100)
  16. println("厨师生产了馒头 $i")
  17. mutableStateFlow.emit(i)
  18. }
  19. }
  20. }

输出结果为:

  1. <---顾客来到店里,等待厨师生产馒头
  2. 厨师生产了馒头 1
  3. <---顾客收到馒头 1
  4. 厨师生产了馒头 2
  5. 厨师生产了馒头 3
  6. 厨师生产了馒头 4
  7. 厨师生产了馒头 5
  8. <---顾客吃完馒头 1
  9. <---顾客收到馒头 5
  10. 厨师生产了馒头 6
  11. <---顾客吃完馒头 5
  12. <---顾客收到馒头 6
  13. <---顾客吃完馒头 6

可以看到结果与 onBufferOverflow.DROP_OLDEST 一致。
证明了 StateFlow 的背压效果是丢弃旧值,与其始终接收最新值的特性一致。

  1. val mutableStateFlow = MutableStateFlow(0)
  2. // |||
  3. // 约等价与
  4. val mutableSharedFlow = MutableSharedFlow<Int>(replay = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST)
  5. mutableSharedFlow.emit(0)

stateIn 转换

stateIn 的用法和 shareIn 大体一致,就是最后一个参数,由 replay 变为了初始值,这个也很好理解,stateFlow 的 replay 就是 1,所以不用传。

值得注意的是 stateIn 有两个方法签名

  1. // 第一个
  2. public fun <T> Flow<T>.stateIn(
  3. scope: CoroutineScope,
  4. started: SharingStarted,
  5. initialValue: T
  6. ): StateFlow<T>
  7. // 第二个
  8. public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>

第二个 stateIn 是个挂起阻断函数,直到接收到第一个值之前,都会处于挂起状态。

Flow、SharedFlow 与 StateFlow 汇总


这里直接说汇总

  • Flow :比较适合表现一些一次性的代码片段,例如我请求一个接口,就会有如下代码片段

    将请求状态置为 请求中 请求接口 根据接口返回将请求状态置为请求成功或者失败

  • SharedFlow:适合共享的代码片段,例如持续获取经纬度,然后通知
  • StateFlow:由于其的粘性和防抖的特性,比较适合用于与界面交互
    • 粘性可以保证数据不丢失
    • 防抖可以保证避免无意义的界面刷新

由于 StateFlow 的防抖特性是根据 equals 函数,所以有些 List 和 Map 的数据会被过滤,遇到这种情况可以使用定制的SharedFlow,参考

其实这里应该有每种 Flow 在项目中的具体使用示例,不过篇幅就太大了,后面会单独有一篇文章去说(这里有个文章参考,可以先看下),等写完了会更新此处的文章地址。