百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kotlin协程之一文看懂StateFlow和SharedFlow

liuian 2025-04-01 19:46 18 浏览

概述

Android 架构经过多年的发展和迭代,目前最常用的应该是 MVVM 了,至于 MVI 架构,在实际开发中我看到的比较少,但无论是哪一款,都是使用分层的架构思想,引用一张官网的推荐架构图:

对于 UI Layer, 我们一般有两个组件:

  • View: 处理 UI 展现,用户事件监听,页面跳转等,比如说 Activity 和 Fragment
  • ViewModel: 向 View 提供数据

这二者通常通过响应式编程来处理它们之间的交互,View 订阅 ViewModel 暴露的响应式接口,接收到通知后进行相应逻辑,而 ViewModel 不再持有任何形式的 View 的引用。常用的响应式组件有 LiveData, RxJava, Flow 等:

  • RxJava: 个人很少使用,倒不是因为它不好,而是...一直傻傻不太会用 RxJava 的操作符,看不太懂,又一直没花时间(懒惰)去研究它那些操作符的原理,就不怎么敢用。
  • LiveData: 在很长一段时间里,这个组件简直是我(们大家)的宝贝,简单,上手快,又能满足大多数场景的需求,感恩。
  • Flow: 流传它想要取代 LiveData, 网上也有一些从 LiveData 迁移到 Flow 的文章。

个人看来,LiveData 不至于真的被替换掉,因为它有很多优势,比如说使用简单,轻量级,可感知生命周期,可观察,粘性(优缺视使用场景而定)等;所以一般而言,对于 View 和 ViewModel 之间简单的响应式开发,使用 LiveData 就足够了,毕竟合适的才是最好的,而对于 UI Layer 之外的复杂场景,可以考虑使用 Flow 用来处理异步数据流。

之前有分析过基础的 Flow 工作原理,这篇文章我们再看看 StateFlow 和 SharedFlow。

Flow的三个角色

Flow 数据流中有三个角色:

  • 生产者(Producer): 生产添加到数据流中的数据
  • 消费者(Consumer): 消费数据流中的数据
  • 中介(Intermediaries 可选): 可以修改发送到数据流的值,或修正数据流本身

参考官方中的一段视频动画,可以很好地理解这个过程:

不得不说,这些新知识点的学习,官方文档或者视频往往更直观易懂,网上很多文章写得也挺好的,可以用来加深理解,但建议官方文档也可以多看看。

冷流与热流

在前一篇文章 Flow 工作原理 中也有解析过 Flow 为什么是冷流,参考这个流程图:

flow {} 方式(或flowOf, asFlow)创建的 Flow 实例是 SafeFlow 类型,当调用其 collect(FlowCollector) 方法时,首先会执行该 Flow 对象传入的 block 代码块,代码块中一般会有 emit 方法发射值,这个 emit 会调用到 collect 方法传入的 action 代码块。因为它在每次调用 collect 时才去触发发送数据的动作,所以说 Flow 是冷流

即 Flow 是在消费者消费时才会生产数据,且冷流和消费者是一对一的关系,即当有多个消费者消费时,生产者是重新生产发送数据的。

与之对应的是热流,Flow 有提供相关实现: StateFlow 和 SharedFlow 。

StateFlow 和 SharedFlow 是热流,生产数据不依赖消费者消费,热流与消费者是一对多的关系,当有多个消费者时,它们之间的数据都是同一份。

StateFlow

类似 LiveData, StateFlow 也提供了 可读写只读 两个版本:

public interface StateFlow : SharedFlow {
    // StateFlow 当前的数据
    public val value: T
}

public interface MutableStateFlow : StateFlow, MutableSharedFlow {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}

StateFlow 与 LiveData 的定位比较类似,官方可能是想用它来取代 LiveData, 其共同点:

  • 都提供了 可读写只读 两个版本。
  • 值唯一,会把最新值重现给订阅者,即粘性。
  • 可被多个订阅者订阅(共享数据流)。
  • 子线程中多次发送数据可能会丢失数据。

不同点:

  • StateFlow 必须传入初始值,保证值的空安全,永远有值。
  • StateFlow 可以防抖,即多次传入相同值,不会有响应(Setting a value that is [equal][Any.equals] to the previous one does nothing) 。
  • 当 View 进入 STOPPED 状态时,LiveData.observe() 会停止接收数据,而从 StateFlow 或其他 Flow collect 数据的操作并不会自动停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块 collect 数据流。

SharedFlow

SharedFlow 也提供了 可读写只读 两个版本:

public interface SharedFlow : Flow {
    public val replayCache: List
}

public interface MutableSharedFlow : SharedFlow, FlowCollector {
    override suspend fun emit(value: T)
    public fun tryEmit(value: T): Boolean
    public val subscriptionCount: StateFlow
    public fun resetReplayCache()
}

与 StateFlow 的区别:

  • SharedFlow 没有初始值,StateFlow 必须传初始值。
  • SharedFlow 可以保留历史数据,新订阅者可以获取到之前发射过的一系列数据,StateFlow 只保留最新数据。
  • SharedFlow 可以传入一个 replay 参数,它表示可以对新订阅者重新发送 replay 个历史数据,默认值为 0, 即非粘性。
  • StateFlow 可以看成是一个 replay = 1 且没有缓冲区的 SharedFlow 。
  • SharedFlow 在子线程中多次 emit() 不会丢失数据。

State 和 Event

根据 Android developers 上的官方示例,可以看出 StateFlow 和 SharedFlow 分别是用来处理 状态(state)事件(event) 的,它称呼 StateFlow 是 a state-holder observable

val uiState: StateFlow

val tickFlow: SharedFlow<Event>
  • 对 UI 而言,state 是 UI 组件的状态,它应当始终都有一个值来表示其状态,且当有新的订阅者加入时,它也应知道当前的状态,即 StateFlow 的粘性。
  • 而 event 事件只有在发生某些动作后才会触发,不需要有初始值。SharedFlow 默认非粘性的,当新的订阅者加入时,它不会重复发生已经发生过的事件。
  • 对于状态而言,即使多次发送,我们只关注最新的状态;对于事件而言,如果多次发送,我们不想丢失任何一个前台的事件。

SharedFlow 使用

创建 SharedFlow

看看 SharedFlow 的构造函数:

public fun  MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow

主要有三个参数:

  • replay: 新订阅者 collect 时,发送 replay 个历史数据给它,默认新订阅者不会获取以前的数据。
  • extraBufferCapacity: MutableSharedFlow 缓存的数据个数为 replay + extraBufferCapacity; 缓存一方面用于粘性事件的发送,另一方面也为了处理背压问题,既下游的消费者的消费速度低于上游生产者的生产速度时,数据会被放在缓存中。
  • onBufferOverflow: 背压处理策略,缓存区满后怎么处理(挂起或丢弃数据),默认挂起。注意,当没有订阅者时,只有最近 replay 个数的数据会存入缓存区,不会触发 onBufferOverflow 策略。

下面演示一下 MutableSharedFlow 的创建及其参数的作用。

在 ViewModel 中发射数据

class MainViewModel(): CoroutineScope by CoroutineScope(Dispatchers.IO + SupervisorJob()) {
    private val _data1: MutableSharedFlow = MutableSharedFlow(
        replay = 10
    )
    val data1: SharedFlow = _data1

    fun refresh() {
        var cur = 1
        launch {
            repeat(200) {
                delay(20)
                _data1.emit(cur)
                println("emit data: ${cur++}")
            }
        }
    }
}

订阅数据

class MainActivity : BaseActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        job = launch { // 订阅 data1
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.data1.collect {
                    delay(100) // 消费速度慢于生产速度
                    println("data: $it")
                }
            }
        }
    }

    fun click(view: View) {
        launch {
            viewModel.refresh() // 触发数据生产
            delay(2000)
            job?.cancel() // 2s 后停止订阅
            delay(2000)
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.data1.collect { // 2s 后继续订阅
                    delay(100)
                    println("data again: $it")
                }
            }
        }
    }
}

看看输出

// 首先消费速度慢于生产速度
System.out: emit data: 1
System.out: emit data: 2
System.out: emit data: 3
System.out: emit data: 4
System.out: emit data: 5
System.out: data: 1
System.out: emit data: 6
System.out: emit data: 7
System.out: emit data: 8
System.out: emit data: 9
System.out: emit data: 10
System.out: data: 2
// 接着缓存区满了,消费了一个数据后,缓存区才够存储新的数据
System.out: emit data: 11
System.out: emit data: 12
System.out: emit data: 13
System.out: data: 3
System.out: emit data: 14
System.out: data: 4
System.out: emit data: 15
// ...
// 2s 后取消了订阅,生产者发送数据就不受 onBufferOverflow 影响了,只会把最新的 10 条数据存入缓存区
System.out: emit data: 29
System.out: data: 19
System.out: emit data: 30
System.out: emit data: 31
System.out: emit data: 32
System.out: emit data: 33
System.out: emit data: 34
System.out: emit data: 35
System.out: emit data: 36
System.out: emit data: 37
// ...
// 接着 2s 后又继续订阅,订阅者消费的第一条数据是 113, 可以看到恰好是生产者生产的前 10 条数据
System.out: emit data: 121
System.out: emit data: 122
System.out: emit data: 123
System.out: data again: 113
System.out: emit data: 124
System.out: data again: 114
System.out: emit data: 125
System.out: data again: 115
System.out: emit data: 126
System.out: data again: 116
System.out: emit data: 127

shareIn 转换

可以使用 shareIn 方法把普通 flow 冷流转化成 SharedFlow 热流。通过 shareIn 创建的 SharedFlow 会把数据供给 View 订阅,同时也会订阅上游的数据流:

public fun  Flow.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow

有三个参数:

  • scope: 用于共享数据流的 CoroutineScope, 此作用域函数的生命周期应长于任何使用方,使共享数据流在足够长的时间内保持活跃状态。
  • started: 启动策略
  • replay: 同上 replay 含义

started 有三种取值:

  • Eagerly: 立即启动,到 scope 作用域被结束时停止
  • Lazily: 当存在首个订阅者时启动,到 scope 作用域被结束时停止
  • WhileSubscribed: 在没有订阅者的情况下取消订阅上游数据流,避免不必要的资源浪费

对于只执行一次的操作,可以使用 Lazily 或 Eagerly, 否则可以使用 WhileSubscribed 来实现一些优化。

public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted

它支持两个参数:

  • stopTimeoutMillis: 最后一个订阅者结束订阅后多久停止订阅上游流
  • replayExpirationMillis: 数据 replay 的过时时间,超出时间后,新订阅者不会收到历史数据

简单示例

Repository 层

class MainRepository {
    private val _data: MutableSharedFlow = MutableSharedFlow()
    val data: SharedFlow = _data

    suspend fun request() {
        var cur = 1
        repeat(100) {
            delay(100)
            _data.emit(cur) // 模拟生产数据
            println("emit data: ${cur++}")
        }
    }
}

ViewModel 层

class MainViewModel(
    private val repository: MainRepository
) : CoroutineScope by CoroutineScope(Dispatchers.IO + SupervisorJob()) {
    val data by lazy {
        repository.data.shareIn(
            scope = this,
            started = SharingStarted.WhileSubscribed(5000)
        ).onEach {
            println("repository data: $it")
        }.map {
            // 转换数据
            "UI Data-$it"
        }
    }
    private val _state = MutableStateFlow(MainState.INIT)
    val state: StateFlow = _state

    fun refresh() {
        launch {
            _state.value = MainState.LOADING
            repository.request()
            _state.value = MainState.INIT
        }
    }
}

View 层

jobData = launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.data.collect {
            println("data: $it")
        }
    }
}
jobState = launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.state.collect {
            println("state: $it")
        }
    }
}

StateFlow 使用

创建 StateFlow

看看 StateFlow 的构造函数:

public fun  MutableStateFlow(value: T): MutableStateFlow

构造函数只需传入一个初始值,它本质上是一个 replay = 1 且没有缓冲区的 SharedFlow, 因此在第一次订阅时会先获取到初始值。

stateIn 转换

和 SharedFlow 类似,也可以用 stateIn 将普通流转化成 StateFlow:

public fun  Flow.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow

跟 shareIn 不同的是需要传入一个初始值。

总结

个人看来,LiveData 不至于真的被替换掉,因为它有很多优势,比如说使用简单,轻量级,可感知生命周期,可观察等。简单的组件可能功能不够强大,强大的组件用起来一般都比较复杂,二者不可兼得,然而无论是组件还是架构,合适的才是最好的,我们在选择某个组件或者某种架构的时候,需要结合自身项目和需求的特点,选择合适的即可。

一般而言,对于 View 和 ViewModel 之间简单的响应式开发,使用 LiveData 就足够了,而对于一些复杂场景(切换线程,数据流变换等),可以考虑使用 Flow 来处理异步数据流。

作者:苍耳叔叔
链接:
https://juejin.cn/post/7169843775240405022

来源:稀土掘金

相关推荐

HR必备Excel函数:4个与日期相关的计算函数。

提到日期函数,很多人首先会想到“today”,它可以显示当天的日期,并且每次打开表格时都会自动更新。但是,对于前天、昨天、明天和后天的日期,就不能用yesterday或者tomorrow等这些英文了,...

这篇文章有点长,但可以让你十分钟玩转Excel的时间函数

日期与时间函数——TODAY、NOW、YEAR、MONTH、DAY!如何用WORKDAY函数查询距离某天的第20个工作日是哪一天?如何用NETWORKDAYS函数查询员工工作了多少个工作日?如何用WE...

Excel2020年日历套装,表格设计,农历显示,查阅套打轻松应用

Hello大家好,我是帮帮。今天跟大家分享一组Excel2020年日历套装,表格设计,自带农历控件,查阅套打轻松应用。有个好消息!为了方便大家更快的掌握技巧,寻找捷径。请大家点击文章末尾的“了解更多”...

巧用NETWORKDAYS函数计算两个日期之间工作日的天数

带有日期的单元格是我们日常使用EXCEL的时候经常见到的,有的时候我们需要求出两个日期之间间隔的天数,可以直接用结束日期减去开始日期即可,这是个非常简单的减法公式。不过这个单纯的减法公式会默认去掉开始...

Excel按工作日、休息日进行汇总

1、按周六日/其它时间汇总为了区分一周的周六日和其它时间,可以使用WEEKDAY函数,把WEEKDAY函数的第2个参数指定为2,如WEEKDAY(A3,2),则周一返回1,周二返回2,…,周六返回...

如何计算每月应出勤天数,如有法定假期和调休,如何计算

本文介绍如何计算每月的应出勤天数。第一部分介绍正常双休制下计算应出勤天数;第二部份介绍当月有法定假期和调休的情况下计算应出勤天数。一、计算正常双休制的应出勤天数如下图所示,要求计算各员工2021年3月...

《Excel一键生成工作日历:让会议排期更轻松!》

每当需要安排会议时,总要翻看日历确认工作日,再逐个标注会议时间,既耗时又容易出错。今天教大家用Excel快速生成工作日历表,让会议排期变得简单高效!一、快速生成日历框架创建基础日期:在A1单元格输入月...

如何计算指定日期区间内,有多少工作日和休息日?

大家好,今天咱们要解决的问题是如何计算给定的一段日期内,正常工作日有多少天,放假时间有多少天?比如咱们要计算2025年3月份工作日一共有多少天,又有多少天放假,如下图所示:通过肉眼我们可以数清楚,20...

如何如何在表格中自动突出显示双休日?

现在不少人喜欢用Excel来制作备忘录或安排工作事项。在表格中输入日期后,可以使用条件格式突出显示双休日,避免在休息日安排了工作。具体方法是这样的:第1步:选择要设置条件格式的日期单元格区域;在“开始...

excel函数技巧:networkdays.intl判断节假日

如图,想知道6月的每一天是否是节假日,公式如下:=NETWORKDAYS.INTL(A2,A2,1,$E$2:$E$28)这个函数既可以判断当前日期(一参=二参)是否是周末及工作日(三参、四参)还可得...

仅需3步,让考勤表根据实际休息日,自动地填充颜色

Hello,大家好,之前跟大家分享了我们如何让考勤表根据单休与双休自动的填充颜色,最近有粉丝问到:能不能让考勤表根据实际的休息日自动的填充颜色呢?可以是可以,只不过因为牵扯到假期调休,我们每年的休息日...

5步搞定动态考勤表!标记节假日、调休日?Excel自动变色!

今天教你用「动态考勤表」一招解决所有问题!只需输入月份,自动变色、自动更新节假日,从此告别加班,效率翻倍!动态考勤表的优势:自动变色:节假日、双休日一键标记,颜色分明。一表多用:修改月份即可...

一起用python做个炫酷音乐播放器,想听啥随便搜

前言前段时间写的Python自制一款炫酷音乐播放器,有不少小伙伴私信我,对播放器提了不少改进建议,让我完善播放器的功能。今天音乐播放器2.0版本完成了,大家一起来看看是如何用python自制一款炫酷的...

用Python做个“冰墩墩雪容融”桌面部件(好玩又有趣)

桌面太单调?今天就带大家,一起用Python的PyQt5开发一个有趣的自定义桌面动画挂件,看看实现的动画挂件效果!下面,我们开始介绍这个自定义桌面动画挂件的制作过程。一、核心功能设计实现将动态图gif...

Python串口调试助手源码分享

以下是一个基于Python和PyQt5实现的串口调试助手示例,包含核心功能实现代码:pythonimportsysimportserialfromPyQt5.QtCoreimportQTim...