文章目录

前言

简单记录一下KotlinFlow的使用,记录于此,方便自己查阅和回顾。

本文主要根据参考文继续简单整理的,感谢原作者分享。

正文

Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。

  1. 使用flow构建函数构建一个Flow类型返回值的函数

  2. flow{}构建体中可以调用挂起函数,即上流

  3. 上流使用emit函数发射值

  4. 下流使用collect函数收集值(没有调用这个函数,flow{}中代码是不会执行的)

emit()用于放射值,collect()用于收集值。

创建Flow

写法1

fun firstFlow() = flow {
    for (i in 1..4) {
        delay(500)
        emit(i)
    }
}
//或者
fun firstFlow(): Flow<Int> = flow {
    for (i in 1..4) {
        delay(500)
        emit(i)
    }
}

需要运行在协程或suspend方法中。

runBlocking {
    firstFlow().collect { value ->
        Log.d(TAG, "runBlocking 1 value : $value")
    }
}

写法2

runBlocking {
    flow {
        for (i in 1..4) {
            delay(500)
            emit(i)
        }
    }.collect { value ->
        Log.d(TAG, "runBlocking 2 value : $value")
    }
}

写法3

前面2中都是类似 flow{...},下面介绍其他的方式。

下面使用onEach

runBlocking {
    flowOf(1,2,3).onEach {
        delay(500)
    }.collect { value ->
        Log.d(TAG, "runBlocking 3 value : $value")
    }
}

flowOf() 构建器定义了一个发射固定值集的流, 使用 flowOf 构建 Flow 不需要显示调用 emit() 发射数据。

写法4

runBlocking {
    listOf(1, 2, 3).asFlow().onEach {
        delay(500)
    }.collect { value ->
        Log.d(TAG, "runBlocking 4 value : $value")
    }
}

使用 asFlow() 扩展函数,可以将各种集合与序列转换为流,也不需要显示调用 emit() 发射数据

取消Flow

流采用了与协程同样的协助取消。流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。取消Flow 只需要取消它所在的协程即可。

方法类别方法名称核心机制/操作符主要适用场景
基础取消协程Job取消job.cancel()最常用的方法,取消收集Flow的协程。
结构化并发取消scope.cancel()取消整个作用域,自动取消其所有子协程(如ViewModel的onCleared)。
超时控制超时取消withTimeout(timeMillis)为Flow收集设置时间上限,超时后自动取消并抛出TimeoutCancellationException
无异常超时取消withTimeoutOrNull(timeMillis)超时后不抛异常,而是返回null,适用于需要安静退出的场景。
操作符限制条件限制takeWhile { condition }当条件不再满足时(如it < 5),立即停止收集。
数量限制take(n)在收集到第n个值后自动取消上游

下面简单验证一下。

方式1

通过withTimeoutOrNull()进行超时取消。

超时550ms后,终止Flow调用

runBlocking {
    Log.d(TAG, "runBlocking end : ")
    // 在 550 毫秒后超时
    withTimeoutOrNull(550) {
        firstFlow().collect { value ->
            Log.d(TAG, "runBlocking value : $value")
        }
    }
    Log.d(TAG, "runBlocking end : ")
}

日志输出

runBlocking end :
runBlocking value : 1
runBlocking end :

也就执行了一次之后,终止了。

方式2

使用cancel函数。

runBlocking {
    Log.d(TAG, "runBlocking start : ")
    firstFlow().collect { value ->
        Log.d(TAG, "runBlocking value : $value")
        if (value % 2 == 0) {
            cancel()
        }
    }
    Log.d(TAG, "runBlocking end : ")
}

运行后崩溃了,日志打印如下(部分):

runBlocking start :
runBlocking value : 1
runBlocking value : 2
//略
Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@fdfb15d

显然,这种取消时不可靠的。

方式3

使用takeWhile(条件)或take(第几个)。

takeWhile()

当takeWhile(条件)中的条件满足时返回为true,会执行collect()中的打印。

runBlocking {
    firstFlow().takeWhile {
        it < 3
    }.collect { value ->
        Log.d(TAG, "runBlocking value : $value")
    }
}

输出日志

runBlocking value : 1
runBlocking value : 2

take()

take(第几个),传入的是数值n。

如果n大于发射的个数,都会打印;小于的话就打印n个。

runBlocking {
    firstFlow().take(3).collect { value ->
        Log.d(TAG, "runBlocking value : $value")
    }
}

上面三个,就打印发射的3个值,最后后面的丢弃。

runBlocking value : 1
runBlocking value : 2
runBlocking value : 3

方式4

取消协程也可以取消Flow的打印。

//创建一个协成
val myCoroutineScope = CoroutineScope(Dispatchers.IO).launch {
    firstFlow().collect { value ->
        Log.d(TAG, "runBlocking value : $value")
    }
}
//为了看效果,延迟1.6s
runBlocking {
    delay(1600)
    //取消协程
    myCoroutineScope.cancel()
}

日志打印

runBlocking value : 1
runBlocking value : 2
runBlocking value : 3

方式5

抛出异常中断Flow

val flow = flow {
    for (i in 1..4) {
    	//大于3时抛出异常
        if (i >3) throw Exception("Flow Stopped")
        emit(i)
    }
}
runBlocking {
    flow.catch { e ->
        Log.d(TAG, "runBlocking e : $e")
    }.collect { value ->
        Log.d(TAG, "runBlocking value : $value")
    }
}

日志输出

runBlocking value : 1
runBlocking value : 2
runBlocking value : 3
runBlocking e : java.lang.Exception: Flow Stopped

参考文章

  1. kotlin--Flow的运用

  2. Kotlin 协程三 —— 数据流 Flow

  3. Kotlin中的冷流和热流以及如何让Flow停下来

  4. 《腾讯元宝》

相关文章

暂无评论

评论审核已启用。您的评论可能需要一段时间后才能被显示。

none
暂无评论...