[kotlin]-Flow 基础
什么是Flow
官方介绍
一种异步数据流,它顺序地发出值并正常或异常地完成。
流上的中间运算符(例如map、filter、take、zip等)是应用于上游流或流并返回下游流的函数,可以在其中应用更多运算符。中间操作不执行流中的任何代码,也不暂停函数本身。他们只是为未来的执行和快速返回建立了一个操作链。这被称为冷流特性。
个人理解
类似于Rxjava 可以异步操作数据流,更灵活,保证数据正确的流向,提供多操作符在数据发送的过程中,对数据进行转换,扩展方法多,易用性高。
- 发送方会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据
flow
- 中介可以修改发送到数据流的值,或修正数据流本身
zip,map,filter
- 接收方则使用数据流中的值
collect
。
基础使用
创建流
最基础的方法可以通过 flow { ... } 去创建一个冷流。
flowOf
就是flow{}的 一个包装,里面调用了flow{}。- 使用
.asFlow()
扩展函数,里面调用了flow{}进行转换。 - flowOf(...)函数从一组固定的值创建一个流。
- 通过扩展方法,各种类型的asFlow()将它们转换为流。
- flow { ... }构建器函数,用于从顺序调用到emit函数构造任意流。
- channelFlow { ... }构建器函数,用于从对发送函数的潜在并发调用构造任意流。
- MutableStateFlow和MutableSharedFlow定义了相应的构造函数来创建一个可以直接更新的热流。
中间方
过度流操作符,例如 map 与 filter
(1..3).asFlow().map {
it * 100
}.collect {
println("map result $it")
}
// map result 100
// map result 200
// map result 300
接收方
末端操作符是在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
- 转化为各种集合,例如 toList 与 toSet。
val list = (1..3).asFlow().map {
it * 100
}.toList()
println("list result $list")
// list result [100, 200, 300]
- 获取第一个(first)值与确保流发射单个(single)值的操作符。
- 使用 reduce() 与 fold() 将流规约到单个值。
什么是冷流/热流
冷流 :
- 冷流是惰性的,需要显式订阅才会执行,可以有多个订阅启动流执行。
- 流中的数据会被发送给所有订阅了该流的收集器(collectors)。
- 单个冷流可以通过多种方式收集,如使用多个collect{},或转换为多个LiveData等。
- 冷流本身不存储状态,是通过运算产生流数据,所以可以有无限个订阅者来收集数据。
- 不同订阅者收集冷流数据的时间和方式可以完全不同。
- 重复订阅冷流会重新发射完整的数据集。
private var cloudFlow: Flow<String> = flow {
println("cloudFlow emit data cloudFlow start")
emit("1")
emit("2")
emit("3")
println("cloudFlow emit data cloudFlow end")
}
viewModelScope.launch {
cloudFlow.collect {
println("cloudFlow One collect data $it")
}
}
viewModelScope.launch {
cloudFlow.collect {
println("cloudFlow Two collect data $it")
}
}
// cloudFlow emit data cloudFlow start
// cloudFlow One collect data 1
// cloudFlow One collect data 2
// cloudFlow One collect data 3
// cloudFlow emit data cloudFlow end
// cloudFlow emit data cloudFlow start
// cloudFlow Two collect data 1
// cloudFlow Two collect data 2
// cloudFlow Two collect data 3
// cloudFlow emit data cloudFlow end
总结: 冷流是惰性执行、支持重播所有数据、需要被收集激活的流模式,适用于传输完整数据集或事件序列,非常灵活。
热流
- 热流是主动执行的流,会自发推送新数据到订阅者。
- 订阅者只能收到订阅后的数据,无法获取之前已发射的数据。
- 数据直接推送到订阅者,不需要被显式收集。
- 更新可以实时反馈到订阅者,实现快速响应。
- 使用asLiveData()、asSharedFlow()等可以将冷流转换为热流。
- StateFlow和SharedFlow是热流的常见实现。
- BroadcastChannel也可以用作热流。
- 适合频繁更新、显示最新状态的数据场景。
- 需要管理订阅生命周期,避免泄漏。
private var cloudFlow: Flow<String> = flow {
println("cloudFlow emit data cloudFlow start")
emit("1")
emit("2")
emit("3")
println("cloudFlow emit data cloudFlow end")
}
//冷流 转换成ShareFlow热流
var shareFlow = cloudFlow.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), replay = 3)
viewModelScope.launch {
shareFlow.collect {
println("shareFlow One collect data $it")
}
}
viewModelScope.launch {
shareFlow.collect {
println("shareFlow Two collect data $it")
}
}
// cloudFlow emit data cloudFlow start
// cloudFlow emit data cloudFlow end
// shareFlow One collect data 1
// shareFlow One collect data 2
// shareFlow One collect data 3
// shareFlow Two collect data 1
// shareFlow Two collect data 2
// shareFlow Two collect data 3
SharedFlow 和 StateFlow
区别与特性:
- 两者都是热流。
- 两者均支持配置回放最近发射的值,StateFlow 读取value,SharedFlow可以配置存放值容器的大小。
- 共享性:SharedFlow允许多个订阅者,StateFlow通常单个订阅者
- 读取状态:StateFlow通过flow.value直接读取,SharedFlow通过replayCache读取
- 线程安全性:SharedFlow需要外部同步,StateFlow内部本身线程安全
- 生命周期管理:SharedFlow需要自行管理,StateFlow与控制器生命周期绑定
创建方式
val sharedFlow = MutableSharedFlow<T>(replay=1)
//还可以通过shareIn() 进行冷流转热流
val stateFlow = MutableStateFlow(initialValue)
//还可以通过stateIn() 进行冷流转热流
总结:
- SharedFlow 适合分发事件,广播消息到多个订阅者
- StateFlow 更适合可观察的 UI 组件状态管理