继上一篇文章,协程里有一个很大的特性就是 Flow。经过一阵尝试、学习,可能有一点点的收获,还是记录一下吧。
「流」这个词,相信现在越来越多的开发者都接触并喜欢这个概念了。当 RxJava 风靡全球时,就有了「流」这个概念。基于 Kotlin,我们可以流式的写并发代码;基于 Flow,我们也可以流式的处理数据。面对复杂的数据逻辑,写出来的代码也可以像流淌的溪水一样干净,漂亮。
依赖
使用 Flow,需要添加依赖:
1 | implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:版本号' |
我们使用 lifecycle-runtime 或者 viewmodel 的 kotlin 扩展时,本身就会依赖此库。如果使用的 api 依赖,便可以直接使用 Flow。
简单使用
创建数据流:
1 | private fun count(): Flow<Int> = flow { |
监听数据流:
1 | GlobalScope.launch { |
- flow 构建器函数会创建数据流;emit 函数发送新值至数据流;map 函数修改数据流;collect 函数收集数据流;catch 函数捕获异常。
- map 等属于中间运算符,可在应用于数据流时,设置一系列暂不执行的链式运算,留待将来使用值时执行。仅将一个中间运算符应用于数据流不会启动数据流收集。
- collect 等终端运算符可触发数据流开始监听值。由于 collect 是挂起函数,因此需要在协程中执行。
- catch 函数只能捕获上游的异常,无法捕获下游的异常。
- catch 函数捕获到异常后,collect 函数无法执行。可以考虑通过 catch 函数执行 emit 操作处理后续逻辑。
callbackFlow
callbackFlow 可以将基于回调的 API 转换为数据流。以文本框输入监听为例,结合上面的网络请求示例。
1 | private fun TextView.textWatcherFlow(): Flow<String> = callbackFlow<String> { |
采集数据流:
1 | lifecycleScope.launchWhenStarted { |
这个写法看起来就很舒服了,可以将 listener 的注册与注销,与生命周期进行绑定,写一个拓展方法即可。同时数据转成 Flow 发出来,可以做后续的各种处理,非常奈斯~
冷流、热流
冷流是按需创建的,并且会在它们被观察时发送数据;热流则总是活跃,无论是否被观察,它们都能发送数据。
直接通过 flow{} 构造出来的流,是冷流。StateFlow、SharedFlow 则是热流(1.4.0 版本才引入)。
1 | class LatestNewsViewModel( |
采集数据流:
1 | class LatestNewsActivity : AppCompatActivity() { |
这些看起来和 LiveData 类似:
- StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要。
- 当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,您需要从 Lifecycle.repeatOnLifecycle 块收集数据流。
说白了就是数据一直在产生,LiveData observe 的时候本身就是在可见时,才进行回调。而 collect 会一直回调,需要结合 repeatOnLifecycle 一块使用。
关于冷流、热流的概念,现在还很模糊,只能后面边用边摸索了~
总的来说,Flow 非常强大,能做的事情也很多。所以网上也有说法:出了 Flow 就可以废弃 LiveData 了。这个可以看一下凯哥的视频LiveData:还没普及就让我去世?我去你的 Kotlin 协程。简而言之,我们可以有很多种技术来实现某一些特定的场景,并不一定就得是 A 技术替换 B 技术。萝卜白菜,各有所爱。当然我们还是得依据自身场景,尽量使用主流的技术。
续一把
正好就用到 Flow 来做了一把倒计时的需求:
1 | private fun exitCountDown() { |
看起来可太简单了,整个流程就在这一个方法里。如果不用 Flow,大概率就是 handler.postDelay,或者 Timer 了,逻辑就会分散在各处,不方便查看。香,真香!
另外这里需要注意一下,假设可以中断这个倒计时,那么就会将这个 launch 返回的 job 保存起来,然后调用 job.cancel(),这个时候也会执行 onCompletion 方法。即调用了 job.cancel(),却仍然执行了 exit() 方法,这是不符合预期的。所以需要修改一下代码,就像这样:
1 | private fun exitCountDown() { |
当没有调用 cancel 时,cause 会是 null,代表正常结束。如果调用了 cancel,cause 则会是个 JobCancellationException。
1 | onCompletion:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@db2855a |
嗯,然后又做了一个网络监听的需求,封装了一个类:
1 | object GlobalNetWorkMonitor { |
信心满满的跑了一把,结果网络一发生变化就崩溃了:ClosedSendChannelException: Channel was closed。
后面找到原因:Reason is that callbackFlow block closes the (hidden under the hood) channel, as soon, as everything within
1 | callbackFlow { |
也就是当括号里的代码执行完了之后, callbackFlow 自动就 close 了,这个时候还去 offer 就会报错,需要添加 awaitClose。于是改成:
1 | private val connect = callbackFlow { |
参考
Kotlin Flow场景化学习
[正确]的使用Kotlin Flow进行搜索优化
Android 上的 Kotlin 数据流
ClosedSendChannelException for callbackFlow