本文主要根据参考文继续简单整理的,感谢原作者分享。
正文
Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。
使用flow构建函数构建一个Flow类型返回值的函数
flow{}构建体中可以调用挂起函数,即上流
上流使用emit函数发射值
下流使用collect函数收集值(没有调用这个函数,flow{}中代码是不会执行的)
emit()用于放射值,collect()用于收集值。
创建Flow
写法1
fun firstFlow() = flow { for (i in 1..4) { delay(500) emit(i) } } //或者 fun firstFlow(): Flow<Int> = flow { for (i in 1..4) { delay(500) emit(i) } }
需要运行在协程或suspend方法中。
runBlocking { firstFlow().collect { value -> Log.d(TAG, "runBlocking 1 value : $value") } }
写法2
runBlocking { flow { for (i in 1..4) { delay(500) emit(i) } }.collect { value -> Log.d(TAG, "runBlocking 2 value : $value") } }
写法3
前面2中都是类似 flow{...},下面介绍其他的方式。
下面使用onEach
runBlocking { flowOf(1,2,3).onEach { delay(500) }.collect { value -> Log.d(TAG, "runBlocking 3 value : $value") } }
flowOf() 构建器定义了一个发射固定值集的流, 使用 flowOf 构建 Flow 不需要显示调用 emit() 发射数据。
写法4
runBlocking { listOf(1, 2, 3).asFlow().onEach { delay(500) }.collect { value -> Log.d(TAG, "runBlocking 4 value : $value") } }
使用 asFlow() 扩展函数,可以将各种集合与序列转换为流,也不需要显示调用 emit() 发射数据
取消Flow
流采用了与协程同样的协助取消。流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。取消Flow 只需要取消它所在的协程即可。
| 方法类别 | 方法名称 | 核心机制/操作符 | 主要适用场景 |
|---|---|---|---|
| 基础取消 | 协程Job取消 | job.cancel() | 最常用的方法,取消收集Flow的协程。 |
| 结构化并发取消 | scope.cancel() | 取消整个作用域,自动取消其所有子协程(如ViewModel的onCleared)。 | |
| 超时控制 | 超时取消 | withTimeout(timeMillis) | 为Flow收集设置时间上限,超时后自动取消并抛出TimeoutCancellationException。 |
| 无异常超时取消 | withTimeoutOrNull(timeMillis) | 超时后不抛异常,而是返回null,适用于需要安静退出的场景。 | |
| 操作符限制 | 条件限制 | takeWhile { condition } | 当条件不再满足时(如it < 5),立即停止收集。 |
| 数量限制 | take(n) | 在收集到第n个值后自动取消上游 |
下面简单验证一下。
方式1
通过withTimeoutOrNull()进行超时取消。
超时550ms后,终止Flow调用
runBlocking { Log.d(TAG, "runBlocking end : ") // 在 550 毫秒后超时 withTimeoutOrNull(550) { firstFlow().collect { value -> Log.d(TAG, "runBlocking value : $value") } } Log.d(TAG, "runBlocking end : ") }
日志输出
runBlocking end : runBlocking value : 1 runBlocking end :
也就执行了一次之后,终止了。
方式2
使用cancel函数。
runBlocking { Log.d(TAG, "runBlocking start : ") firstFlow().collect { value -> Log.d(TAG, "runBlocking value : $value") if (value % 2 == 0) { cancel() } } Log.d(TAG, "runBlocking end : ") }
运行后崩溃了,日志打印如下(部分):
runBlocking start :
runBlocking value : 1
runBlocking value : 2
//略
Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@fdfb15d
显然,这种取消时不可靠的。
方式3
使用takeWhile(条件)或take(第几个)。
takeWhile()
当takeWhile(条件)中的条件满足时返回为true,会执行collect()中的打印。
runBlocking {
firstFlow().takeWhile {
it < 3
}.collect { value ->
Log.d(TAG, "runBlocking value : $value")
}
}
输出日志
runBlocking value : 1 runBlocking value : 2
take()
take(第几个),传入的是数值n。
如果n大于发射的个数,都会打印;小于的话就打印n个。
runBlocking {
firstFlow().take(3).collect { value ->
Log.d(TAG, "runBlocking value : $value")
}
}
上面三个,就打印发射的3个值,最后后面的丢弃。
runBlocking value : 1 runBlocking value : 2 runBlocking value : 3
方式4
取消协程也可以取消Flow的打印。
//创建一个协成
val myCoroutineScope = CoroutineScope(Dispatchers.IO).launch {
firstFlow().collect { value ->
Log.d(TAG, "runBlocking value : $value")
}
}
//为了看效果,延迟1.6s
runBlocking {
delay(1600)
//取消协程
myCoroutineScope.cancel()
}
日志打印
runBlocking value : 1 runBlocking value : 2 runBlocking value : 3
方式5
抛出异常中断Flow
val flow = flow {
for (i in 1..4) {
//大于3时抛出异常
if (i >3) throw Exception("Flow Stopped")
emit(i)
}
}
runBlocking {
flow.catch { e ->
Log.d(TAG, "runBlocking e : $e")
}.collect { value ->
Log.d(TAG, "runBlocking value : $value")
}
}
日志输出
runBlocking value : 1 runBlocking value : 2 runBlocking value : 3 runBlocking e : java.lang.Exception: Flow Stopped
参考文章
《》
《》
《》
