Kotlin Flow
Basis
为什么需要flow:
挂起函数只可以返回单个值,如果我们需要返回多个值,就要用到flow
比如这种情况
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
//-------------------------------------------
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
//-------------------------------------------
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
如果是一次返回所有值我们可以返回List,如果返回的数据需要一些时间才能返回,我们可以使用sequence,但如果我们需要写异步代码,就可以使用协程,但此时只能返回一个整体值,我们用flow来解决
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
冷流:
当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。热流:
不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据
build
flow{...}
Collection.asFlow()
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
cancel
当协程被取消时flow也会被取消
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
Emitting 1
1
Emitting 2
2
Done
Completion
当Flow执行完后,我们可能要执行某些操作,有俩种方式:
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
onCompletion API 的好处是可以判断Flow是否正常结束,如果是非正常结束lambda表达式的参数会是一个Throwable参数且不为空,如果正常结束则是为空的Throwable
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception
onCompletion并不会处理异常,它依然会将异常传递给下游
Intermediate operators
和Collection和sequence能使用的Intermediate operators相同,最常见的有map和filter
区别在于flow的Intermediate operators代码块中可以调用挂起函数
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
response 1
response 2
response 3
Transform operator
通过使用Transform操作符,我们可以发送任意值任意次数
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
这种操作符可以根据特定的条件提前终止flow
比如take
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
1
2
Finally in numbers
Terminal operators
Terminal operator都是挂起函数,它们会开启一个flow的数据收集
collect是最基础的,常用的还有
toList toSet
first single
reduce fold
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
55
Context
Flow的collect发生在调用它的协程的上下文中
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
此时不管flow中具体代码是如何实现的,它的上下文都是被指定了,流的这种属性称为上下文保存
所以说默认情况下,flow{...}运行在收集者协程的上下文中
Wrong emission withContext
如果需要在flow中切换Dispatcher,这样做是错误的
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
我们可以使用flowOn
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
flow{...}运行在后台线程而collect运行在主线程
buffer
如果生产者和消费者速率不同会降低效率
考虑这种情况,flow每100s发送一次数据而收集者每300s收集一次,这样总体要耗时1200s
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
1
2
3
Collected in 1220 ms
我们可以使用buffer
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
1
2
3
Collected in 1071 ms
Conflation
如果发送的速率过快我们也可以不选择buffer而是选择处理最新的数据
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
1
3
Collected in 758 ms
Processing the latest value
collectLatest可以做到每当一个新的值来临时就取消旧的收集者并开启一个新的
collectLatest只是一个代表,还有xxxLatest系列API
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
Exception handle
Collector try and catch
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
上面这种处理异常的方式也会处理emitter中出现的异常
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
This exception is still caught and collection is stopped
emitter catch
但如果emitter想要自己封装自己的处理异常的行为呢?
可以使用catch操作符,并且捕获到异常后可以有多种操作:
1.重新抛出
2.将异常作为值emit出去
3.忽视异常,打日志或者是其它的处理方式
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
catch操作符只能catch到上游出现的异常,如果在collect中出现了异常将无法被捕获
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
实战
用Kotlin Flow无缝衔接Retrofit
APIService
interface WeatherApiService {
@GET("weather/now.json")
fun getWeatherInfoNow(@Query("location") location: String): Flow<WeatherInfo>
}
为Retrofit添加FlowCallAdapter
private val retrofit = Retrofit.Builder()
// ...
.addCallAdapterFactory(FlowCallAdapterFactory.create())
// ...
.build()
可为FlowCallAdapterFactory.create()指定async参数控制是同步调用Call还是异步调用Call,默认为false,即由协程上下文决定网络请求的执行线程。
invoke
MainScope().launch {
ApiServiceManager.weatherApiService
.getWeatherInfoNow(location = "北京")
//通过一系列操作符处理数据,如map,如果有必要的话
.map {
// ...
}
//在Dispatcher.IO上下文中产生订阅数据
.flowOn(Dispatchers.IO)
//捕获异常
.catch { ex ->
//处理异常
println("error occurs:$ex")
}
//订阅数据
.collect {
//处理数据
println("weather info:$it")
}
}
其中FlowCallAdapter需要我们自己去实现,目的是为了使Api接口方法的返回值可声明为Flow
自定义CallAdapter需实现接口retrofit2.CallAdapter,看下它的相关方法:
//泛型参数R表示Call<R>对象的泛型参数,默认为Response<ResponseBody>或ResponseBody,
//如果运用了GsonConverter,有可能是Response<[Bean]>或[Bean]
public interface CallAdapter<R, T> {
//将响应体ResponseBody解析为何种类型,如Call<User>或Call<Response<User>>的responseType为User
Type responseType();
//将Call<R>转化成T类型的对象,如Rxjava中,将Call<R>转化成Observable<R>
T adapt(Call<R> call);
}
BodyFlowCallAdapter
BodyFlowCallAdapter负责将Call
//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class BodyFlowCallAdapter<R>(private val responseBodyType: R) : CallAdapter<R, Flow<R>> {
//由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
//所以直接原样返回responseBodyType即可
override fun responseType(): Type = responseBodyType as Type
//直接调用bodyFlow(call)返回Flow<R>对象。
override fun adapt(call: Call<R>): Flow<R> = bodyFlow(call)
}
fun <R> bodyFlow(call: Call<R>): Flow<R> = flow {
suspendCancellableCoroutine<R> { continuation ->
//协程取消时,调用Call.cancel()取消call
continuation.invokeOnCancellation {
call.cancel()
}
try {
//执行call.execute()
val response = call.execute()
if (response.isSuccessful) {
//http响应[200..300),恢复执行,并返回响应体
continuation.resume(response.body()!!)
} else {
//其他http响应,恢复执行,并抛出http异常
continuation.resumeWithException(HttpException(response))
}
} catch (e: Exception) {
//捕获的其他异常,恢复执行,并抛出该异常
continuation.resumeWithException(e)
}
}.let { responseBody ->
//通过flow发射响应体
emit(responseBody)
}
}
ResponseFlowCallAdapter
ResponseFlowCallAdapter负责将Call<Response
//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class ResponseFlowCallAdapter<R>(private val responseBodyType: R) :
CallAdapter<R, Flow<Response<R>>> {
//由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
//所以直接原样返回responseBodyType即可
override fun responseType() = responseBodyType as Type
//直接调用responseFlow(call)返回Flow<Response<R>>对象。
override fun adapt(call: Call<R>): Flow<Response<R>> = responseFlow(call)
}
fun <T> responseFlow(call: Call<T>): Flow<Response<T>> = flow {
suspendCancellableCoroutine<Response<T>> { continuation ->
//协程取消时,调用call.cancel()取消call
continuation.invokeOnCancellation {
call.cancel()
}
try {
//执行call.execute()
val response = call.execute()
//恢复执行,并返回Response
continuation.resume(response)
} catch (e: Exception) {
//捕获异常,恢复执行,并返回异常
continuation.resumeWithException(e)
}
}.let { response ->
//通过flow发射Response
emit(response)
}
}
AsyncBodyFlowCallAdapter&&AsyncResponseFlowCallAdapter
AsyncBodyFlowCallAdapter 和 AsyncResponseFlowCallAdapter是异步版的FlowCallAdapter,这里的异步指的是在实际调用Call时,调用的是call.enqueue方法,以AsyncBodyFlowCallAdapter为例与同步版做下对比:
fun <R> asyncBodyFlow(call: Call<R>): Flow<R> = flow {
try {
suspendCancellableCoroutine<R> { continuation ->
//协程取消时,调用Call.cancel()取消call
continuation.invokeOnCancellation {
call.cancel()
}
//调用call.enqueue(),在callback里恢复执行并返回结果
call.enqueue(object : Callback<R> {
override fun onResponse(call: Call<R>, response: Response<R>) {
if (response.isSuccessful) {
//http响应[200..300),恢复执行并返回响应体
continuation.resume(response.body()!!)
} else {
//其他http响应,恢复执行并抛出http异常
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<R>, t: Throwable) {
//其他捕获的异常,恢复执行并抛出该异常
continuation.resumeWithException(t)
}
})
}.let { responseBody->
//通过flow发射响应体
emit(responseBody)
}
} catch (e: Exception) {
suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
Dispatchers.Default.dispatch(continuation.context) {
//特殊case处理,确保抛出异常前挂起,感兴趣可参考https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
continuation.intercepted().resumeWithException(e)
}
COROUTINE_SUSPENDED
}
}
}
最后写一个工厂类
class FlowCallAdapterFactory private constructor(private val async: Boolean) :
CallAdapter.Factory() {
//returnType,代表api接口方法的返回值类型,annotations为该接口方法的注解
//根据参数返回该接口方法的CallAdapter
override fun get(
returnType: Type,
annotations: Array<out Annotation>,
retrofit: Retrofit,
): CallAdapter<*, *>? {
//如果返回值原始类型不是Flow类型,直接返回null,表示不做处理
if (getRawType(returnType) != Flow::class.java) return null
//强制返回值类型为Flow<R>,而不是Flow
if (returnType !is ParameterizedType) {
throw IllegalStateException("the flow type must be parameterized as Flow<Foo>!")
}
//获取Flow的泛型参数
val flowableType = getParameterUpperBound(0, returnType)
//获取Flow的泛型参数的原始类型
val rawFlowableType = getRawType(flowableType)
return if (rawFlowableType == Response::class.java) {
//Flow<T>中的T为retrofit2.Response,但不是泛型Response<R>模式
if (flowableType !is ParameterizedType) {
throw IllegalStateException("the response type must be parameterized as Response<Foo>!")
}
//选取Response的泛型参数作为ResponseBody,创建Flow<Response<R>>模式的FlowCallAdapter
val responseBodyType = getParameterUpperBound(0, flowableType)
createResponseFlowCallAdapter(async, responseBodyType)
} else {
//直接将Flow的泛型参数作为ResponseBody,创建Flow<R>模式的FlowCallAdapter
createBodyFlowCallAdapter(async, flowableType)
}
}
companion object {
//获取工厂实例的方法
//async表示是异步调用Call还是同步调用Call,默认false,即同步调用,
//同步调用则由协程上下文决定Call.execute()的执行线程
//若为true,则协程只调用Call.enqueue()方法,网络请求在okhttp的线程池里执行
@JvmStatic
fun create(async: Boolean = false) = FlowCallAdapterFactory(async)
}
}
几个场景
简单列表数据的加载状态
简单的列表显示场景,可以使用onStart
,onEmpty
,catch
,onCompletion
等回调操作符,监听数据流的状态,显示相应的加载状态UI。
onStart
:在数据发射之前触发,onStart所在的线程,是数据产生的线程onCompletion
:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程onEmpty
:当数据流结束了,缺没有发出任何元素的时候触发。catch
:数据流发生错误的时候触发flowOn
:指定上游数据流的CoroutineContext,下游数据流不会受到影响
private fun coldFlowDemo() {
//创建一个冷流,在3秒后发射一个数据
val coldFlow = flow<Int> {
delay(3000)
emit(1)
}
lifecycleScope.launch(Dispatchers.IO) {
coldFlow.onStart {
Log.d(TAG, "coldFlow onStart, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = true
mBinding.tvLoadingStatus.text = "加载中"
}.onEmpty {
Log.d(TAG, "coldFlow onEmpty, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "数据加载为空"
}.catch {
Log.d(TAG, "coldFlow catch, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "数据加载错误:$it"
}.onCompletion {
Log.d(TAG, "coldFlow onCompletion, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "加载完成"
}
//指定上游数据流的CoroutineContext,下游数据流不会受到影响
.flowOn(Dispatchers.Main)
.collect {
Log.d(TAG, "coldFlow collect:$it, thread:${Thread.currentThread().name}")
}
}
}
coldFlow onStart, thread:main
coldFlow onCompletion, thread:main
coldFlow collect:1, thread:DefaultDispatcher-worker-1
同一种数据,需要加载本地数据和网络数据
在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。
但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。
- 本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
- 网络数据比本地数据先加载完成:
-
- 网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
- 网络数据加载失败,可以继续尝试使用本地数据进行兜底。
- 本地数据和网络数据都加载失败:通知上层数据加载失败
abstract class CacheRepositity<T> {
private val TAG = "CacheRepositity"
fun getData() = channelFlow<CResult<T>> {
supervisorScope {
val dataFromLocalDeffer = async {
fetchDataFromLocal().also {
Log.d(TAG,"fetchDataFromLocal result:$it , thread:${Thread.currentThread().name}")
//本地数据加载成功
if (it is CResult.Success) {
send(it)
}
}
}
val dataFromNetDeffer = async {
fetchDataFromNetWork().also {
Log.d(TAG,"fetchDataFromNetWork result:$it , thread:${Thread.currentThread().name}")
//网络数据加载成功
if (it is CResult.Success) {
send(it)
//如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
dataFromLocalDeffer.cancel()
}
}
}
//本地数据和网络数据,都加载失败的情况
val localData = dataFromLocalDeffer.await()
val networkData = dataFromNetDeffer.await()
if (localData is CResult.Error && networkData is CResult.Error) {
send(CResult.Error(Throwable("load data error")))
}
}
}
protected abstract suspend fun fetchDataFromLocal(): CResult<T>
protected abstract suspend fun fetchDataFromNetWork(): CResult<T>
}
sealed class CResult<out R> {
data class Success<out T>(val data: T) : CResult<T>()
data class Error(val throwable: Throwable) : CResult<Nothing>()
}
测试案例:
private fun cacheRepositityDemo(){
val repositity=TestRepositity()
lifecycleScope.launch {
repositity.getData().onStart {
Log.d(TAG, "TestRepositity: onStart")
}.onCompletion {
Log.d(TAG, "TestRepositity: onCompletion")
}.collect {
Log.d(TAG, "collect: $it")
}
}
}
本地数据比网络数据加载快
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(1000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromNetWork")
}
}
onStart
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion
网络数据比本地数据加载快
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Success("data from fetchDataFromNetWork")
}
}
onStart
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion
网络数据加载失败,使用本地数据
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Error(Throwable("fetchDataFromNetWork Error"))
}
}
onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
onCompletion
网络数据和本地数据都加载失败
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Error(Throwable("fetchDataFromLocal Error"))
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Error(Throwable("fetchDataFromNetWork Error"))
}
}
onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
collect: Error(throwable=java.lang.Throwable: load data error)
onCompletion
多种数据源,合并后展示
在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。
实现目标:
- 接口间不需要互相等待,哪些数据先回来,就先展示哪部分
- 控制数据的显示顺序
可以借助combine合并flow
可以合并多个不同的 Flow 数据流,生成一个新的流。只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。
class HomeViewModel : ViewModel() {
//暴露给View层的列表数据
val list = MutableLiveData<List<String?>>()
//多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
private val bannerFlow = MutableStateFlow<String?>(null)
private val channelFlow = MutableStateFlow<String?>(null)
private val listFlow = MutableStateFlow<String?>(null)
init {
//使用combine操作符
viewModelScope.launch {
combine(bannerFlow, channelFlow, listFlow) { bannerData, channelData, listData ->
Log.d("HomeViewModel", "combine bannerData:$bannerData,channelData:$channelData,listData:$listData")
//只要子flow里面的数据不为空,就放到resultList里面
val resultList = mutableListOf<String?>()
if (bannerData != null) {
resultList.add(bannerData)
}
if (channelData != null) {
resultList.add(channelData)
}
if (listData != null) {
resultList.add(listData)
}
resultList
}.collect {
//收集combine之后的数据,修改liveData的值,通知UI层刷新列表
Log.d("HomeViewModel", "collect: ${it.size}")
list.postValue(it)
}
}
}
fun loadData() {
viewModelScope.launch(Dispatchers.IO) {
//模拟耗时操作
async {
delay(1000)
Log.d("HomeViewModel", "getBannerData success")
bannerFlow.emit("Banner")
}
async {
delay(2000)
Log.d("HomeViewModel", "getChannelData success")
channelFlow.emit("Channel")
}
async {
delay(3000)
Log.d("HomeViewModel", "getListData success")
listFlow.emit("List")
}
}
}
}
HomeViewModel
- 提供一个 LiveData 的列表数据给View层使用
- 内部有3个子 flow ,分别负责相应数据的生产。(这里简单都返回String,实际场景根据需要,返回相应的数据类型即可)。
- 通过 combine 操作符,组合这3个子flow的数据。
- collect 接收生成的新数据,并修改liveData的数据,通知刷新UI
View层使用
private fun flowCombineDemo() {
val homeViewModel by viewModels<HomeViewModel>()
homeViewModel.list.observe(this) {
Log.d("HomeViewModel", "observe size:${it.size}")
}
homeViewModel.loadData()
}
简单的创建一个 ViewModel
,observe
列表数据对应的LiveData
。
通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。
日志输出:
combine bannerData:null,channelData:null,listData:null
collect: 0
observe size:0
getBannerData success
combine bannerData:Banner,channelData:null,listData:null
collect: 1
observe size:1
getChannelData success
combine bannerData:Banner,channelData:Channel,listData:null
collect: 2
observe size:2
getListData success
combine bannerData:Banner,channelData:Channel,listData:List
collect: 3
observe size:3