篇章目的
- 了解 flow:基础用法
- 了解冷热流
- 三个 Flow 的使用与区别
- 想要了解更多,转移官方教程
前提知识:
Flow 定义与用法
Flow 是什么
Flow 的官方( Kotlin )定义是一个异步的数据流,它按照顺序发出值。
数据流大家都应该挺熟的,例如在编程里我们读取一个文件,就需要用到流。而 Flow 呢是一个被协程客制化的数据流,在数据流动的过程中是可以被挂起的。
在 Kotlin 中 Flow 的写法很简单,通过 flow
就可以定义,例如下:
val myFlow = flow<Any> {
}
简单分析下上边的代码,myFlow 是这个流的实例,flow 是创建流的函数,而泛型规定了这个流里能发什么数据,最后那个 Lambda 就是控制留的区域,在这个区域里可以发送数据,处理数据,挂起操作等。
Flow 怎么用
刚刚说了 flow 怎么定义了,下面我们用 flow 形式的代码模拟一个视频流
class VideoData(val data: String)
fun flowTest() = runBlocking<Unit> {
val videoFlow = flow {
emit(VideoData("播放片头曲"))
delay(1000)
emit(VideoData("遇见怪兽"))
delay(1000)
emit(VideoData("你相信光吗"))
delay(1000)
emit(VideoData("打败怪兽"))
delay(1000)
emit(VideoData("播放片尾曲"))
}
}
上边的代码,我们通过 flow 模拟了一个视频流的播放过程,你可以把 delay 想象成在拖动视频。那么定义好的 Flow 该如何去使用呢,就好比上边的例子,得需要一个播放器去接收播放。
接收流需要用到了 Flow
接口提供的 collect
函数,这个函数是个阻塞函数,只有一个 Lambda 的参数用来返回流,例子如下:
fun flowTest() = runBlocking<Unit> {
val videoFlow = flow {
emit(VideoData("播放片头曲"))
delay(1000)
emit(VideoData("遇见怪兽"))
delay(1000)
emit(VideoData("你相信光吗"))
delay(1000)
emit(VideoData("打败怪兽"))
delay(1000)
emit(VideoData("播放片尾曲"))
}
launch {
delay(2000)
println("等待 2s 后")
println("-- 播放器 A 开始播放视频--")
videoFlow.collect {
println("播放器 A 播放:${it.data}")
}
println("-- 播放器 A 结束播放视频--")
}
}
// 输出
/**
-- 播放器 A 开始播放视频--
播放器 A 播放:播放片头曲
播放器 A 播放:遇见怪兽
播放器 A 播放:你相信光吗
播放器 A 播放:打败怪兽
播放器 A 播放:播放片尾曲
-- 播放器 A 结束播放视频--
** /
以上就是最基本的 Flow 的使用,简单总结下:
- 创建:使用
**flow**
函数,在**collect**
时启动执行 flow 内代码块 - 发送:使用
**emit**
函数 - 接收:使用 Flow 的
**collect**
函数 - 取消:取消执行
**collect**
时的协程 Job 或作用域
冷流和热流
刚才已经介绍了 Flow 的基础使用,这里说下 Flow 的特性,Flow 是冷流,且只能有一个观察(接收)者。那冷流是什么,热流又是什么,它们的定义如下:
- 冷流: 无消费者时则不会生产数据,只有一个观察者
- 热流: 无观察者时也会生产数据,可以被多个观察者监听
定义如果有点晦涩,我们一个一个来说,首先要说下冷热流有观察者和无观察者的区别。冷流时无观察者时不会产生数据,热流是不管有没有观察者都会产生数据。
有无观察者冷热流区别
定义说的很明白了,需要用代码去验证下,先是冷流
fun flowTest() = runBlocking<Unit> {
val videoFlow = flow {
emit(VideoData("播放片头曲"))
delay(1000)
emit(VideoData("遇见怪兽"))
delay(1000)
emit(VideoData("你相信光吗"))
delay(1000)
emit(VideoData("打败怪兽"))
delay(1000)
emit(VideoData("播放片尾曲"))
}
launch {
delay(1500)
println("等待 1.5s 后")
println("-- 播放器 A 开始播放视频--")
videoFlow.collect {
println("播放器 A 播放:${it.data}")
}
println("-- 播放器 A 结束播放视频--")
}
}
这个例子其实就加了个 delay(1000)
的实际代码,这已经够验证 Flow 是个冷流了
冷流的定义是,只有在消费(监听)时,才会发送数据,而在这个例子中消费就是 collect
函数。所以说增加的 delay 函数并不会对原有输出有任何影响,运行验证结果输出如下:
/**
等待 1.5s 后
-- 播放器 A 开始播放视频--
播放器 A 播放:播放片头曲
播放器 A 播放:遇见怪兽
播放器 A 播放:你相信光吗
播放器 A 播放:打败怪兽
播放器 A 播放:播放片尾曲
-- 播放器 A 结束播放视频--
** /
通过上边的输出,我们就能看到,在延迟了 1.5s 后流依旧是重头播放的,接下来我们用热流试一下结果:(转换为热流的代码稍后就会说到,这里先理解)
fun flowTest() = runBlocking<Unit> {
val videoFlow = flow {
emit(VideoData("播放片头曲"))
delay(1000)
emit(VideoData("遇见怪兽"))
delay(1000)
emit(VideoData("你相信光吗"))
delay(1000)
emit(VideoData("打败怪兽"))
delay(1000)
emit(VideoData("播放片尾曲"))
}.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)
launch {
delay(1500)
println("等待 1.5s 后")
println("-- 播放器 A 开始播放视频--")
videoFlow.collect {
println("播放器 A 播放:${it.data}")
}
println("-- 播放器 A 结束播放视频--")
}
}
// 输出
/**
等待 1.5s 后
-- 播放器 A 开始播放视频--
播放器 A 播放:你相信光吗
播放器 A 播放:打败怪兽
播放器 A 播放:播放片尾曲
** /
通过这个结果可以看到热流的一些特性,在等待 1.5s 的期间,videoFlow 已经发送了 播放片头曲
和 遇见怪兽
这两个数据,在监听的时候只能监听到后面发送的数据,证明流在监听前已经启动了。
一个和多个观察者冷热流区别
冷流只能有一个观察者,这个我思考了挺久了,因为我用代码是可以 collect 多次的,例如下面:
val videoFlow = flow {...}
launch {
println("-- 播放器 A 开始播放视频--")
videoFlow.collect {
println("播放器 A 播放:${it.data}")
}
println("-- 播放器 A 结束播放视频--")
}
launch {
println("-- 播放器 B 开始播放视频--")
videoFlow.collect {
println("播放器 B 播放:${it.data}")
}
println("-- 播放器 B 结束播放视频--")
}
后面我想明白了,观察者观察的是数据,所以上上边的话转变一下:
- 冷流:一个数据只能被一个观察者观察,而多个观察者会产生多个数据
- 热流:一个数据能被多个观察者观察,多个观察者也只会有一份数据
那么我如何通过代码验证呢,可以用 hash
值,同一个数据的 hash 值是相同的,知道了这点我们改造下代码,把 hash 值加上,首先先看冷流:
fun flowTest() = runBlocking<Unit> {
val videoFlow = flow {
emit(VideoData("播放片头曲"))
delay(1000)
emit(VideoData("遇见怪兽"))
delay(1000)
emit(VideoData("你相信光吗"))
delay(1000)
emit(VideoData("打败怪兽"))
delay(1000)
emit(VideoData("播放片尾曲"))
}
launch {
println("-- 播放器 A 开始播放视频--")
videoFlow.collect {
println("播放器 A 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")
}
println("-- 播放器 A 结束播放视频--")
}
launch {
delay(100)
println("-- 播放器 B 开始播放视频--")
videoFlow.collect {
println("播放器 B 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")
}
println("-- 播放器 B 结束播放视频--")
}
}
在代码中,加上了 hashCode 的值,接下来看下输出
-- 播放器 A 开始播放视频--
播放器 A 播放:播放片头曲 【数据唯一标识:472654579】
-- 播放器 B 开始播放视频--
播放器 B 播放:播放片头曲 【数据唯一标识:1053782781】
播放器 A 播放:遇见怪兽 【数据唯一标识:846063400】
播放器 B 播放:遇见怪兽 【数据唯一标识:627150481】
播放器 A 播放:你相信光吗 【数据唯一标识:128526626】
播放器 B 播放:你相信光吗 【数据唯一标识:1911728085】
播放器 A 播放:打败怪兽 【数据唯一标识:754666084】
播放器 B 播放:打败怪兽 【数据唯一标识:88558700】
播放器 A 播放:播放片尾曲 【数据唯一标识:1265210847】
-- 播放器 A 结束播放视频--
播放器 B 播放:播放片尾曲 【数据唯一标识:801197928】
-- 播放器 B 结束播放视频--
可以看到虽然 A 和 B 播放器都观察到了videoFlow的变化,但是两个播放器收到的是完全不同的数据,这时我们再用 .shareIn()
函数将 videoFlow 转换为热流试一下输出
-- 播放器 A 开始播放视频--
播放器 A 播放:播放片头曲 【数据唯一标识:687241927】
-- 播放器 B 开始播放视频--
播放器 A 播放:遇见怪兽 【数据唯一标识:2096171631】
播放器 B 播放:遇见怪兽 【数据唯一标识:2096171631】
播放器 A 播放:你相信光吗 【数据唯一标识:2114694065】
播放器 B 播放:你相信光吗 【数据唯一标识:2114694065】
播放器 A 播放:打败怪兽 【数据唯一标识:1844169442】
播放器 B 播放:打败怪兽 【数据唯一标识:1844169442】
播放器 A 播放:播放片尾曲 【数据唯一标识:1537358694】
播放器 B 播放:播放片尾曲 【数据唯一标识:1537358694】
改成热流后 hashcode 值一致了,证明热流接收的是相同的数据。
上边的例子也可以映射到现实去联想下,辅助下记忆
- 冷流:视频网站上的视频,每个观看的用户都是单独的
- 热流:视频网站上的直播,每个观看的用户都是共享的
SharedFlow
刚刚上边说了 Flow 和冷热流的概念,并且验证了 Flow 是个冷流,现在来讲下热流。
SharedFlow 是 Flow 的一个子集,是一种热流的实现。Mutable 前缀我想都应该不会陌生,是可编辑的意思例如 MutableMap 和 MutableList 等,SharedFlow 在创建后是不能再发送的数据的,MutableSharedFlow 则可以。
SharedFlow 有如下两种创建方式:
// SharedFlow 创建方式一
val flowIn = flow<Int> {
...
}.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)
// SharedFlow 创建方式二
val mutableSharedFlow = MutableSharedFlow<Int>()
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
mutableSharedFlow.emit(...)
基本使用
MutableSharedFlow 它的基本使用还是很简单的,代码如下:
fun sharedFlowBaseTest() = runBlocking<Unit> {
// SharedFlow 创建方式二
val mutableSharedFlow = MutableSharedFlow<Int>()
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
launch {
delay(1000)
mutableSharedFlow.emit(1)
delay(1000)
mutableSharedFlow.emit(2)
delay(1000)
mutableSharedFlow.emit(3)
}
val collectJob = launch {
sharedFlow.collect {
println("收到:${it}")
}
println("collectJob finish")
}
launch {
sharedFlow.collect {
println("收到:${it}")
}
}
launch {
delay(2100)
collectJob.cancel()
}
}
因为是 Flow 的子集,所以 emit 和 collect 函数使用方式是一致的,值得注意的有如下三点:
- MutableSharedFlow 没有初始值
- 跟 Flow 不同,SharedFlow 的 collect 是个持续阻断的函数(17 行代码永远不会执行)
- SharedFlow 是可以取消的,但是它本身没有取消方法,需要协程 Job 或者协程作用域取消
MutableSharedFlow 使用
刚刚的基本使用在创建 MutableSharedFlow 是没有任何定制化的都是用的默认参数,其实 MutableSharedFlow 是个很灵活的 Flow,通过一些简单的配置就可以实现缓冲、黏性等业务场景。
先看下 MutableSharedFlow 的初始化方法签名:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
其中的三个参数分别的意义为:
replay
:重发个数,当有新的观察者collect
时,发送几个已经发送过的数据给它;默认为 0 个extraBufferCapacity
:缓存个数,减去 replay,MutableSharedFlow 还缓存多少数据;默认为 0 个onBufferOverflow
:到达缓存个数时,缓存的策略;默认为 SUSPENDBufferOverflow.SUSPEND
: 挂起后续缓存的值BufferOverflow.DROP_OLDEST
: 丢掉最旧值BufferOverflow.DROP_LATEST
: 丢掉最新值
接下来会根据顺序挨个去实践下每个参数的行为
replay
定义:当有新的观察者 collect
时,发送几个已经发送过的数据给它,默认为 0 个
例子:
fun sharedFlowReplayTest() = runBlocking {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
launch {
delay(1000)
mutableSharedFlow.emit(1)
delay(1000)
mutableSharedFlow.emit(2)
delay(1000)
mutableSharedFlow.emit(3)
}
launch {
delay(2100)
sharedFlow.collect {
println("收到:${it}")
}
}
}
定义说的很明确,就是重发的个数,在这个例子里 replay = 2
也就是重发 2 个,所以在延迟 2s 后依然会接收到已经发送过的 1、2的值。
onBufferOverflow
缓存满了后,缓存的策略
- SUSPEND::挂起
- DROP_OLDEST:丢弃旧的值,只接收最新的
- DROP_LATEST:丢弃新的值,只接收最旧的
再说这个参数之前,要先讲一个概念,叫 背压
背压:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
举个现实中的例子:
上游:一个厨师做馒头特别快,0.1秒就可以做出一个馒头
下游:顾客吃馒头的速度有限,0.5 秒钟才能吃 1 个
顾客吃第一个馒头的时候回头一看,我去这可不行啊,我压力太大了得告诉厨师 慢点做/后面我只吃最新的馒头 /后面我只吃第下一个生产出来的馒头/。这个行为就叫做背压。
说完了背压的概念,结合上边的例子我们再看下 onBufferOverflow
这个参数
- SUSPEND:您慢点做,我吃一个您做一个
- DROP_OLDEST:后面我只吃最新的馒头
- DROP_LATEST:后面我只吃第下一个生产出来的馒头
现在理论已经出来了剩下的就是实践,下面会根据现实中的例子写个实例项目
DROP_OLDEST
例子
fun sharedFlowBufferOverflowTest() = runBlocking {
val mutableSharedFlow =
MutableSharedFlow<Int>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
launch {
println("<---顾客来到店里,等待厨师生产馒头")
sharedFlow.collect {
println("<---顾客收到馒头 $it")
delay(500)
println("<---顾客吃完馒头 $it")
}
}
launch {
for (i in 1..6) {
delay(100)
println("厨师生产了馒头 $i")
mutableSharedFlow.emit(i)
}
}
}
输出:
<---顾客来到店里,等待厨师生产馒头
厨师生产了馒头 1
<---顾客收到馒头 1
厨师生产了馒头 2
厨师生产了馒头 3
厨师生产了馒头 4
厨师生产了馒头 5
<---顾客吃完馒头 1
<---顾客收到馒头 5
厨师生产了馒头 6
<---顾客吃完馒头 5
<---顾客收到馒头 6
<---顾客吃完馒头 6
如果对这个输出有疑惑的话,我会挨个解释下,如果没有疑惑,直接看下个示例就好
输出解释:
- <—-顾客来到店里,等待厨师生产馒头:这个没啥说的,就是个单纯的输出,代表观察者已经就位
- 厨师生产了馒头 1:生产者生产了第一条数据
- <—-顾客收到馒头 1:观察者收到第一条数据
- 厨师生产了馒头 2:这行要注意下,观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
- 厨师生产了馒头 3:由于生产者设置的是
DROP_LATEST
(只要最新数据),所以会把 ‘馒头 2’从缓存里删掉,并且将 ‘馒头 3’ 放入缓存。- 厨师生产了馒头 4:由于生产者设置的是
DROP_LATEST
(只要最新数据),所以会把 ‘馒头 3’从缓存里删掉,并且将 ‘馒头 4’ 放入缓存。- 厨师生产了馒头 5:由于生产者设置的是
DROP_LATEST
(只要最新数据),所以会把 ‘馒头 4’从缓存里删掉,并且将 ‘馒头 5’ 放入缓存。- <—-顾客吃完馒头 1:观察者消费完了数据
- <—-顾客收到馒头 5:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 5’,所以观察者收到‘馒头 5’,并且将缓存数据清空
- 厨师生产了馒头 6:观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
- <—-顾客吃完馒头 5:观察者消费完了数据
- <—-顾客收到馒头 6:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 6’,所以观察者收到‘馒头 6’,并且将缓存数据清空
- <—-顾客吃完馒头 6:观察者消费完了数据
DROP_LATEST
例子
fun sharedFlowBufferOverflowTest() = runBlocking {
val mutableSharedFlow =
MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
launch {
println("<---顾客来到店里,等待厨师生产馒头")
sharedFlow.collect {
println("<---顾客收到馒头 $it")
delay(500)
println("<---顾客吃完馒头 $it")
}
}
launch {
for (i in 1..6) {
delay(100)
println("厨师生产了馒头 $i")
mutableSharedFlow.emit(i)
}
}
}
�输出:
<---顾客来到店里,等待厨师生产馒头
厨师生产了馒头 1
<---顾客收到馒头 1
厨师生产了馒头 2
厨师生产了馒头 3
厨师生产了馒头 4
厨师生产了馒头 5
<---顾客吃完馒头 1
<---顾客收到馒头 2
厨师生产了馒头 6
<---顾客吃完馒头 2
<---顾客收到馒头 6
<---顾客吃完馒头 6
这个输出的结果我就不分析了,其实跟上边类似,如果想不明白问我。
SUSPEND
例子
fun sharedFlowBufferOverflowTest() = runBlocking {
val mutableSharedFlow =
MutableSharedFlow<Int>(extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
val sharedFlow: SharedFlow<Int> = mutableSharedFlow
val startTime = System.currentTimeMillis()
val finishTime = {
val endTime = System.currentTimeMillis()
" 【持续时间:"+((endTime - startTime)/100 * 100).toString()+"】"
}
launch {
println("<---顾客来到店里,等待厨师生产馒头")
sharedFlow.collect {
println("<---顾客收到馒头 $it ${finishTime()}")
delay(500)
println("<---顾客吃完馒头 $it ${finishTime()}")
}
}
launch {
for (i in 1..6) {
delay(100)
println("厨师生产了馒头 $i ${finishTime()}")
mutableSharedFlow.emit(i)
}
}
}
这个例子较上两个稍微改了一点。首先 extraBufferCapacity
参数设置为 0 了,这个为了方便分析,其次输出里增加了持续时间这个字段,也是为了方便分析的。输出如下:
<---顾客来到店里,等待厨师生产馒头
厨师生产了馒头 1 【持续时间:100】
<---顾客收到馒头 1 【持续时间:100】
厨师生产了馒头 2 【持续时间:200】
<---顾客吃完馒头 1 【持续时间:600】
<---顾客收到馒头 2 【持续时间:600】
厨师生产了馒头 3 【持续时间:700】
<---顾客吃完馒头 2 【持续时间:1100】
<---顾客收到馒头 3 【持续时间:1100】
厨师生产了馒头 4 【持续时间:1200】
<---顾客吃完馒头 3 【持续时间:1600】
<---顾客收到馒头 4 【持续时间:1600】
厨师生产了馒头 5 【持续时间:1700】
<---顾客吃完馒头 4 【持续时间:2100】
<---顾客收到馒头 5 【持续时间:2100】
厨师生产了馒头 6 【持续时间:2200】
<---顾客吃完馒头 5 【持续时间:2600】
<---顾客收到馒头 6 【持续时间:2600】
<---顾客吃完馒头 6 【持续时间:3100】
通过这个输出可以看到,这个缓存策略没有丢弃任何数值,后续的都被挂起了
分析输出的话会说一个关键点就可以了,如下:
输出第4行-第 7 行:在第 4 行的时候生产了‘馒头 2’持续时间是 200 毫秒(厨师 100 毫秒生产一个馒头),而在第 7 行输出里才生产了‘馒头 3’,中间差了 500 毫秒,也就说生产者被挂起了,注意是生产者被挂起,不是观察者
这里映射现实就是,顾客跟厨师说,您等我吃完再做下一个
extraBufferCapacity
缓存个数
其实这个在说完了 onBufferOverflow
参数后就已经很好理解了,就是缓存的个数,你可以去自行更改下上边三个中的缓存个数的数值,然后运行去分析下输出
shareIn 函数转换
转换的代码很简单,使用 shareIn 函数
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
第三个参数不用说了,前两个参数会在下面说下
flow 函数的协程作用域
flow
函数的协程作用域是由 collect
时的上级作用域决定的。
由于 flow 是冷流,所以可以有多个作用域。
下面是个测试:
fun sharedFlowShareInTest() = runBlocking {
val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
val flow = flow {
for (i in 1..6) {
delay(100)
println("<---发送:$i " + Thread.currentThread().name)
emit(i)
}
}
launch {
flow.collect {
println("接收 A: $it " + Thread.currentThread().name)
}
}
scope.launch {
flow.collect {
println("接收 B: $it " + Thread.currentThread().name)
}
}
}
输出:
<---发送:1 pool-1-thread-1
<---发送:1 main
接收 B: 1 pool-1-thread-1
接收 A: 1 main
<---发送:2 main
<---发送:2 pool-1-thread-1
接收 B: 2 pool-1-thread-1
接收 A: 2 main
<---发送:3 main
接收 A: 3 main
<---发送:3 pool-1-thread-1
接收 B: 3 pool-1-thread-1
<---发送:4 main
<---发送:4 pool-1-thread-1
接收 B: 4 pool-1-thread-1
接收 A: 4 main
<---发送:5 main
<---发送:5 pool-1-thread-1
接收 A: 5 main
接收 B: 5 pool-1-thread-1
<---发送:6 main
<---发送:6 pool-1-thread-1
接收 A: 6 main
接收 B: 6 pool-1-thread-1
可以看到在 flow
函数的 lambda 表达式中,是输出了两个不同的线程的。
shareIn 的 scope 参数
flow 是冷流,每个观察者都配有独立的生产者,所以作用域可以有多个。
但是 sharedFlow 是热流,生产者只有一个,那么就会产生一个问题,flow
函数的 lambda 表达式中的作用域到底该是什么。
而 scope 这个参数就是解决此问题的,作用域统一由 shareIn 的 scope 参数指定。
下面将上边例子中的 flow 通过 shareIn 转换下
val flow = flow {
for (i in 1..6) {
delay(100)
println("<---发送:$i " + Thread.currentThread().name)
emit(i)
}
}.shareIn(scope, SharingStarted.Lazily, 0)
再看下输出
<---发送:1 pool-1-thread-1
接收 A: 1 main
接收 B: 1 pool-1-thread-1
<---发送:2 pool-1-thread-1
接收 A: 2 main
接收 B: 2 pool-1-thread-1
<---发送:3 pool-1-thread-1
接收 A: 3 main
接收 B: 3 pool-1-thread-1
<---发送:4 pool-1-thread-1
接收 A: 4 main
接收 B: 4 pool-1-thread-1
<---发送:5 pool-1-thread-1
接收 A: 5 main
接收 B: 5 pool-1-thread-1
<---发送:6 pool-1-thread-1
接收 A: 6 main
接收 B: 6 pool-1-thread-1
可以看到生产者的协程作用域已经是 shareIn 传入的。
shareIn 的 started 参数
- Lazily:当首个观察者出现时,启动 flow
- Eagerly:立即启动 flow
- WhileSubscribed:自定义策略
- stopTimeoutMillis:如果没有观察者时,多长时间取消生产者数据
- replayExpirationMilis:多长时间取消 replay
Lazily 和 Eagerly 这两个参数没啥好说的,都很简单。
主要看下 WhileSubscribed
这个函数。
我们通过具体的业务场景可以更好的理解这个参数的意义:
我们在 App 的数据层里有一个 shareFlow 是每 2 秒获取手机的 GPS 信息,然后 emit 出去。
为了用户的省电考虑,我希望在用户息屏的时候,就不要再获取了,然后亮屏的时候,在重新启动 2s 轮训的获取
这个需求就可以直接用 WhileSubscribed
实现,来看下:
fun sharedFlowShareInTest() = runBlocking {
val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())
val flow = flow {
while (true) {
val uuid = UUID.randomUUID()
println("获取 GPS $uuid")
emit(uuid)
delay(2000)
}
}.shareIn(scope, SharingStarted.WhileSubscribed(stopTimeoutMillis = 0), 0)
val create =
{
launch {
flow.collect {
println("观察者 : $it ")
}
}
}
val job = create()
launch {
delay(3000)
println("息屏了")
job.cancel()
}
launch {
delay(15000)
println("亮屏了")
create()
}
}
在这个例子,flow 模拟了每 2 秒获取的 GPS 信息的数据源,在 3 秒后会取消观察者模拟息屏,在 15 秒过后再次创建一个观察者来模拟亮屏。
输出我就不展示了,自己运行下看下。在输出息屏了后立马就停止了 GPS 的轮询,然后在 12 秒后 GPS 的轮询又会启动了。
刚刚上边的例子 stopTimeoutMillis
用的 0,也就是说没有观察者时立刻停止了生产者的生产。通过 stopTimeoutMillis
这个参数也可以设置指定的缓冲时间,例如我想没有观察者时 1 秒后再停止生产者生产,那就设置为 1000
第二个参数 replayExpirationMilis
就比较简单了,replay 多长时间过期。因为上边已经说了 replay 参数了,而 replayExpirationMilis
阐述的也比较直观,所以就不写例子了。
StateFlow
StateFlow 是 SharedFlow 的一个子集,同样是热流。
相比于 SharedFlow,StateFlow 定制了如下特性
- 始终有值
- 粘性:始终接收最新值
- 防抖(去重):重复的数据不会连续发送
- 可以通过 value 的方式获取最新的值(不用从 collect 中获取)
但是 StateFlow 也失去了 SharedFlow 的灵活性没有了 replay、onBufferOverflow 等有用的特性,这个等下一节对比使用场景的时候具体说。此节主要说下基本使用
// StateFlow 创建方式一
val flowIn2 = flow<Int> {
...
}.stateIn(CoroutineScope(Dispatchers.IO), started = SharingStarted.Lazily, 1)
// StateFlow 创建方式二
val mutableStateFlow = MutableStateFlow(1)
val stateFlow: StateFlow<Int> = mutableStateFlow
MutableStateFlow 使用
相比于 MutableSharedFlow 来说 MutableStateFlow 的构造函数要简单多了,就一个参数,签名如下:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
这个 value 参数没啥好说的,就是初始值。因为 StateFlow 的特性是始终有值,所以必须有个初始值。
接下来,我们测试下 StateFlow 受到背压后会采用什么处理方式,例子就把 onBufferOverflow 这一小结的例子稍微改下,如下:
fun stateFlowTest() = runBlocking {
val mutableStateFlow = MutableStateFlow(0)
val stateFlow: StateFlow<Int> = mutableStateFlow
launch {
println("<---顾客来到店里,等待厨师生产馒头")
stateFlow.collect {
if (it == 0) return@collect
println("<---顾客收到馒头 $it")
delay(500)
println("<---顾客吃完馒头 $it")
}
}
launch {
for (i in 1..6) {
delay(100)
println("厨师生产了馒头 $i")
mutableStateFlow.emit(i)
}
}
}
输出结果为:
<---顾客来到店里,等待厨师生产馒头
厨师生产了馒头 1
<---顾客收到馒头 1
厨师生产了馒头 2
厨师生产了馒头 3
厨师生产了馒头 4
厨师生产了馒头 5
<---顾客吃完馒头 1
<---顾客收到馒头 5
厨师生产了馒头 6
<---顾客吃完馒头 5
<---顾客收到馒头 6
<---顾客吃完馒头 6
可以看到结果与 onBufferOverflow.DROP_OLDEST
一致。
证明了 StateFlow 的背压效果是丢弃旧值,与其始终接收最新值的特性一致。
val mutableStateFlow = MutableStateFlow(0)
// |||
// 约等价与
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST)
mutableSharedFlow.emit(0)
stateIn 转换
stateIn 的用法和 shareIn 大体一致,就是最后一个参数,由 replay 变为了初始值,这个也很好理解,stateFlow 的 replay 就是 1,所以不用传。
值得注意的是 stateIn 有两个方法签名
// 第一个
public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
// 第二个
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>
第二个 stateIn 是个挂起阻断函数,直到接收到第一个值之前,都会处于挂起状态。
Flow、SharedFlow 与 StateFlow 汇总
这里直接说汇总
- Flow :比较适合表现一些一次性的代码片段,例如我请求一个接口,就会有如下代码片段
将请求状态置为 请求中 请求接口 根据接口返回将请求状态置为请求成功或者失败
- SharedFlow:适合共享的代码片段,例如持续获取经纬度,然后通知
- StateFlow:由于其的粘性和防抖的特性,比较适合用于与界面交互
- 粘性可以保证数据不丢失
- 防抖可以保证避免无意义的界面刷新
由于 StateFlow 的防抖特性是根据 equals
函数,所以有些 List 和 Map 的数据会被过滤,遇到这种情况可以使用定制的SharedFlow,参考
其实这里应该有每种 Flow 在项目中的具体使用示例,不过篇幅就太大了,后面会单独有一篇文章去说(这里有个文章参考,可以先看下),等写完了会更新此处的文章地址。