篇章目的
- 了解 flow:基础用法
- 了解冷热流
- 三个 Flow 的使用与区别
- 想要了解更多,转移官方教程
前提知识:
Flow 定义与用法
Flow 是什么
Flow 的官方( Kotlin )定义是一个异步的数据流,它按照顺序发出值。
数据流大家都应该挺熟的,例如在编程里我们读取一个文件,就需要用到流。而 Flow 呢是一个被协程客制化的数据流,在数据流动的过程中是可以被挂起的。
在 Kotlin 中 Flow 的写法很简单,通过 flow 就可以定义,例如下:
val myFlow = flow<Any> {}
简单分析下上边的代码,myFlow 是这个流的实例,flow 是创建流的函数,而泛型规定了这个流里能发什么数据,最后那个 Lambda 就是控制留的区域,在这个区域里可以发送数据,处理数据,挂起操作等。
Flow 怎么用
刚刚说了 flow 怎么定义了,下面我们用 flow 形式的代码模拟一个视频流
class VideoData(val data: String)fun flowTest() = runBlocking<Unit> {val videoFlow = flow {emit(VideoData("播放片头曲"))delay(1000)emit(VideoData("遇见怪兽"))delay(1000)emit(VideoData("你相信光吗"))delay(1000)emit(VideoData("打败怪兽"))delay(1000)emit(VideoData("播放片尾曲"))}}
上边的代码,我们通过 flow 模拟了一个视频流的播放过程,你可以把 delay 想象成在拖动视频。那么定义好的 Flow 该如何去使用呢,就好比上边的例子,得需要一个播放器去接收播放。
接收流需要用到了 Flow 接口提供的 collect 函数,这个函数是个阻塞函数,只有一个 Lambda 的参数用来返回流,例子如下:
fun flowTest() = runBlocking<Unit> {val videoFlow = flow {emit(VideoData("播放片头曲"))delay(1000)emit(VideoData("遇见怪兽"))delay(1000)emit(VideoData("你相信光吗"))delay(1000)emit(VideoData("打败怪兽"))delay(1000)emit(VideoData("播放片尾曲"))}launch {delay(2000)println("等待 2s 后")println("-- 播放器 A 开始播放视频--")videoFlow.collect {println("播放器 A 播放:${it.data}")}println("-- 播放器 A 结束播放视频--")}}// 输出/**-- 播放器 A 开始播放视频--播放器 A 播放:播放片头曲播放器 A 播放:遇见怪兽播放器 A 播放:你相信光吗播放器 A 播放:打败怪兽播放器 A 播放:播放片尾曲-- 播放器 A 结束播放视频--** /
以上就是最基本的 Flow 的使用,简单总结下:
- 创建:使用
**flow**函数,在**collect**时启动执行 flow 内代码块 - 发送:使用
**emit**函数 - 接收:使用 Flow 的
**collect**函数 - 取消:取消执行
**collect**时的协程 Job 或作用域
冷流和热流
刚才已经介绍了 Flow 的基础使用,这里说下 Flow 的特性,Flow 是冷流,且只能有一个观察(接收)者。那冷流是什么,热流又是什么,它们的定义如下:
- 冷流: 无消费者时则不会生产数据,只有一个观察者
- 热流: 无观察者时也会生产数据,可以被多个观察者监听
定义如果有点晦涩,我们一个一个来说,首先要说下冷热流有观察者和无观察者的区别。冷流时无观察者时不会产生数据,热流是不管有没有观察者都会产生数据。
有无观察者冷热流区别
定义说的很明白了,需要用代码去验证下,先是冷流
fun flowTest() = runBlocking<Unit> {val videoFlow = flow {emit(VideoData("播放片头曲"))delay(1000)emit(VideoData("遇见怪兽"))delay(1000)emit(VideoData("你相信光吗"))delay(1000)emit(VideoData("打败怪兽"))delay(1000)emit(VideoData("播放片尾曲"))}launch {delay(1500)println("等待 1.5s 后")println("-- 播放器 A 开始播放视频--")videoFlow.collect {println("播放器 A 播放:${it.data}")}println("-- 播放器 A 结束播放视频--")}}
这个例子其实就加了个 delay(1000) 的实际代码,这已经够验证 Flow 是个冷流了
冷流的定义是,只有在消费(监听)时,才会发送数据,而在这个例子中消费就是 collect 函数。所以说增加的 delay 函数并不会对原有输出有任何影响,运行验证结果输出如下:
/**等待 1.5s 后-- 播放器 A 开始播放视频--播放器 A 播放:播放片头曲播放器 A 播放:遇见怪兽播放器 A 播放:你相信光吗播放器 A 播放:打败怪兽播放器 A 播放:播放片尾曲-- 播放器 A 结束播放视频--** /
通过上边的输出,我们就能看到,在延迟了 1.5s 后流依旧是重头播放的,接下来我们用热流试一下结果:(转换为热流的代码稍后就会说到,这里先理解)
fun flowTest() = runBlocking<Unit> {val videoFlow = flow {emit(VideoData("播放片头曲"))delay(1000)emit(VideoData("遇见怪兽"))delay(1000)emit(VideoData("你相信光吗"))delay(1000)emit(VideoData("打败怪兽"))delay(1000)emit(VideoData("播放片尾曲"))}.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)launch {delay(1500)println("等待 1.5s 后")println("-- 播放器 A 开始播放视频--")videoFlow.collect {println("播放器 A 播放:${it.data}")}println("-- 播放器 A 结束播放视频--")}}// 输出/**等待 1.5s 后-- 播放器 A 开始播放视频--播放器 A 播放:你相信光吗播放器 A 播放:打败怪兽播放器 A 播放:播放片尾曲** /
通过这个结果可以看到热流的一些特性,在等待 1.5s 的期间,videoFlow 已经发送了 播放片头曲 和 遇见怪兽 这两个数据,在监听的时候只能监听到后面发送的数据,证明流在监听前已经启动了。
一个和多个观察者冷热流区别
冷流只能有一个观察者,这个我思考了挺久了,因为我用代码是可以 collect 多次的,例如下面:
val videoFlow = flow {...}launch {println("-- 播放器 A 开始播放视频--")videoFlow.collect {println("播放器 A 播放:${it.data}")}println("-- 播放器 A 结束播放视频--")}launch {println("-- 播放器 B 开始播放视频--")videoFlow.collect {println("播放器 B 播放:${it.data}")}println("-- 播放器 B 结束播放视频--")}
后面我想明白了,观察者观察的是数据,所以上上边的话转变一下:
- 冷流:一个数据只能被一个观察者观察,而多个观察者会产生多个数据
- 热流:一个数据能被多个观察者观察,多个观察者也只会有一份数据
那么我如何通过代码验证呢,可以用 hash 值,同一个数据的 hash 值是相同的,知道了这点我们改造下代码,把 hash 值加上,首先先看冷流:
fun flowTest() = runBlocking<Unit> {val videoFlow = flow {emit(VideoData("播放片头曲"))delay(1000)emit(VideoData("遇见怪兽"))delay(1000)emit(VideoData("你相信光吗"))delay(1000)emit(VideoData("打败怪兽"))delay(1000)emit(VideoData("播放片尾曲"))}launch {println("-- 播放器 A 开始播放视频--")videoFlow.collect {println("播放器 A 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")}println("-- 播放器 A 结束播放视频--")}launch {delay(100)println("-- 播放器 B 开始播放视频--")videoFlow.collect {println("播放器 B 播放:${it.data} 【数据唯一标识:${it.hashCode()}】")}println("-- 播放器 B 结束播放视频--")}}
在代码中,加上了 hashCode 的值,接下来看下输出
-- 播放器 A 开始播放视频--播放器 A 播放:播放片头曲 【数据唯一标识:472654579】-- 播放器 B 开始播放视频--播放器 B 播放:播放片头曲 【数据唯一标识:1053782781】播放器 A 播放:遇见怪兽 【数据唯一标识:846063400】播放器 B 播放:遇见怪兽 【数据唯一标识:627150481】播放器 A 播放:你相信光吗 【数据唯一标识:128526626】播放器 B 播放:你相信光吗 【数据唯一标识:1911728085】播放器 A 播放:打败怪兽 【数据唯一标识:754666084】播放器 B 播放:打败怪兽 【数据唯一标识:88558700】播放器 A 播放:播放片尾曲 【数据唯一标识:1265210847】-- 播放器 A 结束播放视频--播放器 B 播放:播放片尾曲 【数据唯一标识:801197928】-- 播放器 B 结束播放视频--
可以看到虽然 A 和 B 播放器都观察到了videoFlow的变化,但是两个播放器收到的是完全不同的数据,这时我们再用 .shareIn() 函数将 videoFlow 转换为热流试一下输出
-- 播放器 A 开始播放视频--播放器 A 播放:播放片头曲 【数据唯一标识:687241927】-- 播放器 B 开始播放视频--播放器 A 播放:遇见怪兽 【数据唯一标识:2096171631】播放器 B 播放:遇见怪兽 【数据唯一标识:2096171631】播放器 A 播放:你相信光吗 【数据唯一标识:2114694065】播放器 B 播放:你相信光吗 【数据唯一标识:2114694065】播放器 A 播放:打败怪兽 【数据唯一标识:1844169442】播放器 B 播放:打败怪兽 【数据唯一标识:1844169442】播放器 A 播放:播放片尾曲 【数据唯一标识:1537358694】播放器 B 播放:播放片尾曲 【数据唯一标识:1537358694】
改成热流后 hashcode 值一致了,证明热流接收的是相同的数据。
上边的例子也可以映射到现实去联想下,辅助下记忆
- 冷流:视频网站上的视频,每个观看的用户都是单独的
- 热流:视频网站上的直播,每个观看的用户都是共享的
SharedFlow
刚刚上边说了 Flow 和冷热流的概念,并且验证了 Flow 是个冷流,现在来讲下热流。
SharedFlow 是 Flow 的一个子集,是一种热流的实现。Mutable 前缀我想都应该不会陌生,是可编辑的意思例如 MutableMap 和 MutableList 等,SharedFlow 在创建后是不能再发送的数据的,MutableSharedFlow 则可以。
SharedFlow 有如下两种创建方式:
// SharedFlow 创建方式一val flowIn = flow<Int> {...}.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly, 0)// SharedFlow 创建方式二val mutableSharedFlow = MutableSharedFlow<Int>()val sharedFlow: SharedFlow<Int> = mutableSharedFlowmutableSharedFlow.emit(...)
基本使用
MutableSharedFlow 它的基本使用还是很简单的,代码如下:
fun sharedFlowBaseTest() = runBlocking<Unit> {// SharedFlow 创建方式二val mutableSharedFlow = MutableSharedFlow<Int>()val sharedFlow: SharedFlow<Int> = mutableSharedFlowlaunch {delay(1000)mutableSharedFlow.emit(1)delay(1000)mutableSharedFlow.emit(2)delay(1000)mutableSharedFlow.emit(3)}val collectJob = launch {sharedFlow.collect {println("收到:${it}")}println("collectJob finish")}launch {sharedFlow.collect {println("收到:${it}")}}launch {delay(2100)collectJob.cancel()}}
因为是 Flow 的子集,所以 emit 和 collect 函数使用方式是一致的,值得注意的有如下三点:
- MutableSharedFlow 没有初始值
- 跟 Flow 不同,SharedFlow 的 collect 是个持续阻断的函数(17 行代码永远不会执行)
- SharedFlow 是可以取消的,但是它本身没有取消方法,需要协程 Job 或者协程作用域取消
MutableSharedFlow 使用
刚刚的基本使用在创建 MutableSharedFlow 是没有任何定制化的都是用的默认参数,其实 MutableSharedFlow 是个很灵活的 Flow,通过一些简单的配置就可以实现缓冲、黏性等业务场景。
先看下 MutableSharedFlow 的初始化方法签名:
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): MutableSharedFlow<T>
其中的三个参数分别的意义为:
replay:重发个数,当有新的观察者collect时,发送几个已经发送过的数据给它;默认为 0 个extraBufferCapacity:缓存个数,减去 replay,MutableSharedFlow 还缓存多少数据;默认为 0 个onBufferOverflow:到达缓存个数时,缓存的策略;默认为 SUSPENDBufferOverflow.SUSPEND: 挂起后续缓存的值BufferOverflow.DROP_OLDEST: 丢掉最旧值BufferOverflow.DROP_LATEST: 丢掉最新值
接下来会根据顺序挨个去实践下每个参数的行为
replay
定义:当有新的观察者 collect 时,发送几个已经发送过的数据给它,默认为 0 个
例子:
fun sharedFlowReplayTest() = runBlocking {val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2)val sharedFlow: SharedFlow<Int> = mutableSharedFlowlaunch {delay(1000)mutableSharedFlow.emit(1)delay(1000)mutableSharedFlow.emit(2)delay(1000)mutableSharedFlow.emit(3)}launch {delay(2100)sharedFlow.collect {println("收到:${it}")}}}
定义说的很明确,就是重发的个数,在这个例子里 replay = 2 也就是重发 2 个,所以在延迟 2s 后依然会接收到已经发送过的 1、2的值。
onBufferOverflow
缓存满了后,缓存的策略
- SUSPEND::挂起
- DROP_OLDEST:丢弃旧的值,只接收最新的
- DROP_LATEST:丢弃新的值,只接收最旧的
再说这个参数之前,要先讲一个概念,叫 背压
背压:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
举个现实中的例子:
上游:一个厨师做馒头特别快,0.1秒就可以做出一个馒头
下游:顾客吃馒头的速度有限,0.5 秒钟才能吃 1 个
顾客吃第一个馒头的时候回头一看,我去这可不行啊,我压力太大了得告诉厨师 慢点做/后面我只吃最新的馒头 /后面我只吃第下一个生产出来的馒头/。这个行为就叫做背压。
说完了背压的概念,结合上边的例子我们再看下 onBufferOverflow 这个参数
- SUSPEND:您慢点做,我吃一个您做一个
- DROP_OLDEST:后面我只吃最新的馒头
- DROP_LATEST:后面我只吃第下一个生产出来的馒头
现在理论已经出来了剩下的就是实践,下面会根据现实中的例子写个实例项目
DROP_OLDEST 例子
fun sharedFlowBufferOverflowTest() = runBlocking {val mutableSharedFlow =MutableSharedFlow<Int>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)val sharedFlow: SharedFlow<Int> = mutableSharedFlowlaunch {println("<---顾客来到店里,等待厨师生产馒头")sharedFlow.collect {println("<---顾客收到馒头 $it")delay(500)println("<---顾客吃完馒头 $it")}}launch {for (i in 1..6) {delay(100)println("厨师生产了馒头 $i")mutableSharedFlow.emit(i)}}}
输出:
<---顾客来到店里,等待厨师生产馒头厨师生产了馒头 1<---顾客收到馒头 1厨师生产了馒头 2厨师生产了馒头 3厨师生产了馒头 4厨师生产了馒头 5<---顾客吃完馒头 1<---顾客收到馒头 5厨师生产了馒头 6<---顾客吃完馒头 5<---顾客收到馒头 6<---顾客吃完馒头 6
如果对这个输出有疑惑的话,我会挨个解释下,如果没有疑惑,直接看下个示例就好
输出解释:
- <—-顾客来到店里,等待厨师生产馒头:这个没啥说的,就是个单纯的输出,代表观察者已经就位
- 厨师生产了馒头 1:生产者生产了第一条数据
- <—-顾客收到馒头 1:观察者收到第一条数据
- 厨师生产了馒头 2:这行要注意下,观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
- 厨师生产了馒头 3:由于生产者设置的是
DROP_LATEST(只要最新数据),所以会把 ‘馒头 2’从缓存里删掉,并且将 ‘馒头 3’ 放入缓存。- 厨师生产了馒头 4:由于生产者设置的是
DROP_LATEST(只要最新数据),所以会把 ‘馒头 3’从缓存里删掉,并且将 ‘馒头 4’ 放入缓存。- 厨师生产了馒头 5:由于生产者设置的是
DROP_LATEST(只要最新数据),所以会把 ‘馒头 4’从缓存里删掉,并且将 ‘馒头 5’ 放入缓存。- <—-顾客吃完馒头 1:观察者消费完了数据
- <—-顾客收到馒头 5:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 5’,所以观察者收到‘馒头 5’,并且将缓存数据清空
- 厨师生产了馒头 6:观察者此时还没有处理完成数据(顾客还没吃完馒头),生产者首先会把这个数据缓存下来。
- <—-顾客吃完馒头 5:观察者消费完了数据
- <—-顾客收到馒头 6:生产者会发给观察者缓存中的最新数据给观察者,当前缓存中是‘馒头 6’,所以观察者收到‘馒头 6’,并且将缓存数据清空
- <—-顾客吃完馒头 6:观察者消费完了数据
DROP_LATEST 例子
fun sharedFlowBufferOverflowTest() = runBlocking {val mutableSharedFlow =MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)val sharedFlow: SharedFlow<Int> = mutableSharedFlowlaunch {println("<---顾客来到店里,等待厨师生产馒头")sharedFlow.collect {println("<---顾客收到馒头 $it")delay(500)println("<---顾客吃完馒头 $it")}}launch {for (i in 1..6) {delay(100)println("厨师生产了馒头 $i")mutableSharedFlow.emit(i)}}}
�输出:
<---顾客来到店里,等待厨师生产馒头厨师生产了馒头 1<---顾客收到馒头 1厨师生产了馒头 2厨师生产了馒头 3厨师生产了馒头 4厨师生产了馒头 5<---顾客吃完馒头 1<---顾客收到馒头 2厨师生产了馒头 6<---顾客吃完馒头 2<---顾客收到馒头 6<---顾客吃完馒头 6
这个输出的结果我就不分析了,其实跟上边类似,如果想不明白问我。
SUSPEND 例子
fun sharedFlowBufferOverflowTest() = runBlocking {val mutableSharedFlow =MutableSharedFlow<Int>(extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)val sharedFlow: SharedFlow<Int> = mutableSharedFlowval startTime = System.currentTimeMillis()val finishTime = {val endTime = System.currentTimeMillis()" 【持续时间:"+((endTime - startTime)/100 * 100).toString()+"】"}launch {println("<---顾客来到店里,等待厨师生产馒头")sharedFlow.collect {println("<---顾客收到馒头 $it ${finishTime()}")delay(500)println("<---顾客吃完馒头 $it ${finishTime()}")}}launch {for (i in 1..6) {delay(100)println("厨师生产了馒头 $i ${finishTime()}")mutableSharedFlow.emit(i)}}}
这个例子较上两个稍微改了一点。首先 extraBufferCapacity 参数设置为 0 了,这个为了方便分析,其次输出里增加了持续时间这个字段,也是为了方便分析的。输出如下:
<---顾客来到店里,等待厨师生产馒头厨师生产了馒头 1 【持续时间:100】<---顾客收到馒头 1 【持续时间:100】厨师生产了馒头 2 【持续时间:200】<---顾客吃完馒头 1 【持续时间:600】<---顾客收到馒头 2 【持续时间:600】厨师生产了馒头 3 【持续时间:700】<---顾客吃完馒头 2 【持续时间:1100】<---顾客收到馒头 3 【持续时间:1100】厨师生产了馒头 4 【持续时间:1200】<---顾客吃完馒头 3 【持续时间:1600】<---顾客收到馒头 4 【持续时间:1600】厨师生产了馒头 5 【持续时间:1700】<---顾客吃完馒头 4 【持续时间:2100】<---顾客收到馒头 5 【持续时间:2100】厨师生产了馒头 6 【持续时间:2200】<---顾客吃完馒头 5 【持续时间:2600】<---顾客收到馒头 6 【持续时间:2600】<---顾客吃完馒头 6 【持续时间:3100】
通过这个输出可以看到,这个缓存策略没有丢弃任何数值,后续的都被挂起了
分析输出的话会说一个关键点就可以了,如下:
输出第4行-第 7 行:在第 4 行的时候生产了‘馒头 2’持续时间是 200 毫秒(厨师 100 毫秒生产一个馒头),而在第 7 行输出里才生产了‘馒头 3’,中间差了 500 毫秒,也就说生产者被挂起了,注意是生产者被挂起,不是观察者
这里映射现实就是,顾客跟厨师说,您等我吃完再做下一个
extraBufferCapacity
缓存个数
其实这个在说完了 onBufferOverflow 参数后就已经很好理解了,就是缓存的个数,你可以去自行更改下上边三个中的缓存个数的数值,然后运行去分析下输出
shareIn 函数转换
转换的代码很简单,使用 shareIn 函数
public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0): SharedFlow<T>
第三个参数不用说了,前两个参数会在下面说下
flow 函数的协程作用域
flow 函数的协程作用域是由 collect 时的上级作用域决定的。
由于 flow 是冷流,所以可以有多个作用域。
下面是个测试:
fun sharedFlowShareInTest() = runBlocking {val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())val flow = flow {for (i in 1..6) {delay(100)println("<---发送:$i " + Thread.currentThread().name)emit(i)}}launch {flow.collect {println("接收 A: $it " + Thread.currentThread().name)}}scope.launch {flow.collect {println("接收 B: $it " + Thread.currentThread().name)}}}
输出:
<---发送:1 pool-1-thread-1<---发送:1 main接收 B: 1 pool-1-thread-1接收 A: 1 main<---发送:2 main<---发送:2 pool-1-thread-1接收 B: 2 pool-1-thread-1接收 A: 2 main<---发送:3 main接收 A: 3 main<---发送:3 pool-1-thread-1接收 B: 3 pool-1-thread-1<---发送:4 main<---发送:4 pool-1-thread-1接收 B: 4 pool-1-thread-1接收 A: 4 main<---发送:5 main<---发送:5 pool-1-thread-1接收 A: 5 main接收 B: 5 pool-1-thread-1<---发送:6 main<---发送:6 pool-1-thread-1接收 A: 6 main接收 B: 6 pool-1-thread-1
可以看到在 flow 函数的 lambda 表达式中,是输出了两个不同的线程的。
shareIn 的 scope 参数
flow 是冷流,每个观察者都配有独立的生产者,所以作用域可以有多个。
但是 sharedFlow 是热流,生产者只有一个,那么就会产生一个问题,flow 函数的 lambda 表达式中的作用域到底该是什么。
而 scope 这个参数就是解决此问题的,作用域统一由 shareIn 的 scope 参数指定。
下面将上边例子中的 flow 通过 shareIn 转换下
val flow = flow {for (i in 1..6) {delay(100)println("<---发送:$i " + Thread.currentThread().name)emit(i)}}.shareIn(scope, SharingStarted.Lazily, 0)
再看下输出
<---发送:1 pool-1-thread-1接收 A: 1 main接收 B: 1 pool-1-thread-1<---发送:2 pool-1-thread-1接收 A: 2 main接收 B: 2 pool-1-thread-1<---发送:3 pool-1-thread-1接收 A: 3 main接收 B: 3 pool-1-thread-1<---发送:4 pool-1-thread-1接收 A: 4 main接收 B: 4 pool-1-thread-1<---发送:5 pool-1-thread-1接收 A: 5 main接收 B: 5 pool-1-thread-1<---发送:6 pool-1-thread-1接收 A: 6 main接收 B: 6 pool-1-thread-1
可以看到生产者的协程作用域已经是 shareIn 传入的。
shareIn 的 started 参数
- Lazily:当首个观察者出现时,启动 flow
- Eagerly:立即启动 flow
- WhileSubscribed:自定义策略
- stopTimeoutMillis:如果没有观察者时,多长时间取消生产者数据
- replayExpirationMilis:多长时间取消 replay
Lazily 和 Eagerly 这两个参数没啥好说的,都很简单。
主要看下 WhileSubscribed 这个函数。
我们通过具体的业务场景可以更好的理解这个参数的意义:
我们在 App 的数据层里有一个 shareFlow 是每 2 秒获取手机的 GPS 信息,然后 emit 出去。
为了用户的省电考虑,我希望在用户息屏的时候,就不要再获取了,然后亮屏的时候,在重新启动 2s 轮训的获取
这个需求就可以直接用 WhileSubscribed 实现,来看下:
fun sharedFlowShareInTest() = runBlocking {val scope = CoroutineScope(Executors.newSingleThreadExecutor().asCoroutineDispatcher())val flow = flow {while (true) {val uuid = UUID.randomUUID()println("获取 GPS $uuid")emit(uuid)delay(2000)}}.shareIn(scope, SharingStarted.WhileSubscribed(stopTimeoutMillis = 0), 0)val create ={launch {flow.collect {println("观察者 : $it ")}}}val job = create()launch {delay(3000)println("息屏了")job.cancel()}launch {delay(15000)println("亮屏了")create()}}
在这个例子,flow 模拟了每 2 秒获取的 GPS 信息的数据源,在 3 秒后会取消观察者模拟息屏,在 15 秒过后再次创建一个观察者来模拟亮屏。
输出我就不展示了,自己运行下看下。在输出息屏了后立马就停止了 GPS 的轮询,然后在 12 秒后 GPS 的轮询又会启动了。
刚刚上边的例子 stopTimeoutMillis 用的 0,也就是说没有观察者时立刻停止了生产者的生产。通过 stopTimeoutMillis 这个参数也可以设置指定的缓冲时间,例如我想没有观察者时 1 秒后再停止生产者生产,那就设置为 1000
第二个参数 replayExpirationMilis 就比较简单了,replay 多长时间过期。因为上边已经说了 replay 参数了,而 replayExpirationMilis 阐述的也比较直观,所以就不写例子了。
StateFlow
StateFlow 是 SharedFlow 的一个子集,同样是热流。
相比于 SharedFlow,StateFlow 定制了如下特性
- 始终有值
- 粘性:始终接收最新值
- 防抖(去重):重复的数据不会连续发送
- 可以通过 value 的方式获取最新的值(不用从 collect 中获取)
但是 StateFlow 也失去了 SharedFlow 的灵活性没有了 replay、onBufferOverflow 等有用的特性,这个等下一节对比使用场景的时候具体说。此节主要说下基本使用
// StateFlow 创建方式一val flowIn2 = flow<Int> {...}.stateIn(CoroutineScope(Dispatchers.IO), started = SharingStarted.Lazily, 1)// StateFlow 创建方式二val mutableStateFlow = MutableStateFlow(1)val stateFlow: StateFlow<Int> = mutableStateFlow
MutableStateFlow 使用
相比于 MutableSharedFlow 来说 MutableStateFlow 的构造函数要简单多了,就一个参数,签名如下:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
这个 value 参数没啥好说的,就是初始值。因为 StateFlow 的特性是始终有值,所以必须有个初始值。
接下来,我们测试下 StateFlow 受到背压后会采用什么处理方式,例子就把 onBufferOverflow 这一小结的例子稍微改下,如下:
fun stateFlowTest() = runBlocking {val mutableStateFlow = MutableStateFlow(0)val stateFlow: StateFlow<Int> = mutableStateFlowlaunch {println("<---顾客来到店里,等待厨师生产馒头")stateFlow.collect {if (it == 0) return@collectprintln("<---顾客收到馒头 $it")delay(500)println("<---顾客吃完馒头 $it")}}launch {for (i in 1..6) {delay(100)println("厨师生产了馒头 $i")mutableStateFlow.emit(i)}}}
输出结果为:
<---顾客来到店里,等待厨师生产馒头厨师生产了馒头 1<---顾客收到馒头 1厨师生产了馒头 2厨师生产了馒头 3厨师生产了馒头 4厨师生产了馒头 5<---顾客吃完馒头 1<---顾客收到馒头 5厨师生产了馒头 6<---顾客吃完馒头 5<---顾客收到馒头 6<---顾客吃完馒头 6
可以看到结果与 onBufferOverflow.DROP_OLDEST 一致。
证明了 StateFlow 的背压效果是丢弃旧值,与其始终接收最新值的特性一致。
val mutableStateFlow = MutableStateFlow(0)// |||// 约等价与val mutableSharedFlow = MutableSharedFlow<Int>(replay = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST)mutableSharedFlow.emit(0)
stateIn 转换
stateIn 的用法和 shareIn 大体一致,就是最后一个参数,由 replay 变为了初始值,这个也很好理解,stateFlow 的 replay 就是 1,所以不用传。
值得注意的是 stateIn 有两个方法签名
// 第一个public fun <T> Flow<T>.stateIn(scope: CoroutineScope,started: SharingStarted,initialValue: T): StateFlow<T>// 第二个public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>
第二个 stateIn 是个挂起阻断函数,直到接收到第一个值之前,都会处于挂起状态。
Flow、SharedFlow 与 StateFlow 汇总
这里直接说汇总
- Flow :比较适合表现一些一次性的代码片段,例如我请求一个接口,就会有如下代码片段
将请求状态置为 请求中 请求接口 根据接口返回将请求状态置为请求成功或者失败
- SharedFlow:适合共享的代码片段,例如持续获取经纬度,然后通知
- StateFlow:由于其的粘性和防抖的特性,比较适合用于与界面交互
- 粘性可以保证数据不丢失
- 防抖可以保证避免无意义的界面刷新
由于 StateFlow 的防抖特性是根据 equals 函数,所以有些 List 和 Map 的数据会被过滤,遇到这种情况可以使用定制的SharedFlow,参考
其实这里应该有每种 Flow 在项目中的具体使用示例,不过篇幅就太大了,后面会单独有一篇文章去说(这里有个文章参考,可以先看下),等写完了会更新此处的文章地址。
