Kotlin Flow

2022-05-25/2022-06-11

Basis

为什么需要flow:

挂起函数只可以返回单个值,如果我们需要返回多个值,就要用到flow

比如这种情况

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

//-------------------------------------------

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}


fun main() {
    simple().forEach { value -> println(value) } 
}

//-------------------------------------------

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}


fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

如果是一次返回所有值我们可以返回List,如果返回的数据需要一些时间才能返回,我们可以使用sequence,但如果我们需要写异步代码,就可以使用协程,但此时只能返回一个整体值,我们用flow来解决

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}
  • 冷流:当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。
  • 热流:不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据

build

flow{...}

Collection.asFlow()

// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }

cancel

当协程被取消时flow也会被取消

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
Emitting 1
1
Emitting 2
2
Done

Completion

当Flow执行完后,我们可能要执行某些操作,有俩种方式:

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}
simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

onCompletion API 的好处是可以判断Flow是否正常结束,如果是非正常结束lambda表达式的参数会是一个Throwable参数且不为空,如果正常结束则是为空的Throwable

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception

onCompletion并不会处理异常,它依然会将异常传递给下游

Intermediate operators

和Collection和sequence能使用的Intermediate operators相同,最常见的有map和filter

区别在于flow的Intermediate operators代码块中可以调用挂起函数

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
response 1
response 2
response 3

Transform operator

通过使用Transform操作符,我们可以发送任意值任意次数

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

这种操作符可以根据特定的条件提前终止flow

比如take

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
1
2
Finally in numbers

Terminal operators

Terminal operator都是挂起函数,它们会开启一个flow的数据收集

collect是最基础的,常用的还有

toList toSet

first single

reduce fold

val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
55

Context

Flow的collect发生在调用它的协程的上下文中

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

此时不管flow中具体代码是如何实现的,它的上下文都是被指定了,流的这种属性称为上下文保存

所以说默认情况下,flow{...}运行在收集者协程的上下文中

Wrong emission withContext

如果需要在flow中切换Dispatcher,这样做是错误的



fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...

我们可以使用flowOn

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flow{...}运行在后台线程而collect运行在主线程

buffer

如果生产者和消费者速率不同会降低效率

考虑这种情况,flow每100s发送一次数据而收集者每300s收集一次,这样总体要耗时1200s

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}
1
2
3
Collected in 1220 ms

我们可以使用buffer

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
1
2
3
Collected in 1071 ms

Conflation

如果发送的速率过快我们也可以不选择buffer而是选择处理最新的数据

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
1
3
Collected in 758 ms

Processing the latest value

collectLatest可以做到每当一个新的值来临时就取消旧的收集者并开启一个新的

collectLatest只是一个代表,还有xxxLatest系列API

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

Exception handle

Collector try and catch

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

上面这种处理异常的方式也会处理emitter中出现的异常

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

This exception is still caught and collection is stopped

emitter catch

但如果emitter想要自己封装自己的处理异常的行为呢?

可以使用catch操作符,并且捕获到异常后可以有多种操作:

1.重新抛出

2.将异常作为值emit出去

3.忽视异常,打日志或者是其它的处理方式

simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

catch操作符只能catch到上游出现的异常,如果在collect中出现了异常将无法被捕获

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

实战

用Kotlin Flow无缝衔接Retrofit

APIService

interface WeatherApiService {
 @GET("weather/now.json")
 fun getWeatherInfoNow(@Query("location") location: String): Flow<WeatherInfo>
 }

为Retrofit添加FlowCallAdapter

private val retrofit = Retrofit.Builder()
     // ...
     .addCallAdapterFactory(FlowCallAdapterFactory.create())
     // ...
     .build()

可为FlowCallAdapterFactory.create()指定async参数控制是同步调用Call还是异步调用Call,默认为false,即由协程上下文决定网络请求的执行线程。

invoke

MainScope().launch {
         ApiServiceManager.weatherApiService
             .getWeatherInfoNow(location = "北京")
             //通过一系列操作符处理数据,如map,如果有必要的话
             .map {
                 // ...
             }
             //在Dispatcher.IO上下文中产生订阅数据
             .flowOn(Dispatchers.IO)
             //捕获异常
             .catch { ex ->
                 //处理异常
                 println("error occurs:$ex")
             }
             //订阅数据
             .collect {
                 //处理数据
                 println("weather info:$it")
             }
     }

其中FlowCallAdapter需要我们自己去实现,目的是为了使Api接口方法的返回值可声明为Flow

自定义CallAdapter需实现接口retrofit2.CallAdapter,看下它的相关方法:

//泛型参数R表示Call<R>对象的泛型参数,默认为Response<ResponseBody>或ResponseBody,
//如果运用了GsonConverter,有可能是Response<[Bean]>或[Bean]
public interface CallAdapter<R, T> {
  //将响应体ResponseBody解析为何种类型,如Call<User>或Call<Response<User>>的responseType为User
  Type responseType();

  //将Call<R>转化成T类型的对象,如Rxjava中,将Call<R>转化成Observable<R>
  T adapt(Call<R> call);
}

BodyFlowCallAdapter

BodyFlowCallAdapter负责将Call、Call<[Bean]>转化为Flow和Flow<[Bean]>。

//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class BodyFlowCallAdapter<R>(private val responseBodyType: R) : CallAdapter<R, Flow<R>> {
    //由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
    //所以直接原样返回responseBodyType即可
    override fun responseType(): Type = responseBodyType as Type
    //直接调用bodyFlow(call)返回Flow<R>对象。
    override fun adapt(call: Call<R>): Flow<R> = bodyFlow(call)
}




  fun <R> bodyFlow(call: Call<R>): Flow<R> = flow {
    suspendCancellableCoroutine<R> { continuation ->
        //协程取消时,调用Call.cancel()取消call
        continuation.invokeOnCancellation {
            call.cancel()
        }
        try {
            //执行call.execute()
            val response = call.execute()
            if (response.isSuccessful) {
                //http响应[200..300),恢复执行,并返回响应体
                continuation.resume(response.body()!!)
            } else {
                //其他http响应,恢复执行,并抛出http异常
                continuation.resumeWithException(HttpException(response))
            }
        } catch (e: Exception) {
            //捕获的其他异常,恢复执行,并抛出该异常
            continuation.resumeWithException(e)
        }
    }.let { responseBody ->
        //通过flow发射响应体
        emit(responseBody)
    }
}

ResponseFlowCallAdapter

ResponseFlowCallAdapter负责将Call<Response>、Call<Response<[Bean]>>转化为Flow<Response>和Flow<Response<[Bean]>>。

//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class ResponseFlowCallAdapter<R>(private val responseBodyType: R) :
    CallAdapter<R, Flow<Response<R>>> {
    //由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
    //所以直接原样返回responseBodyType即可
    override fun responseType() = responseBodyType as Type
    //直接调用responseFlow(call)返回Flow<Response<R>>对象。
    override fun adapt(call: Call<R>): Flow<Response<R>> = responseFlow(call)
}



fun <T> responseFlow(call: Call<T>): Flow<Response<T>> = flow {
    suspendCancellableCoroutine<Response<T>> { continuation ->
        //协程取消时,调用call.cancel()取消call
        continuation.invokeOnCancellation {
            call.cancel()
        }
        try {
            //执行call.execute()
            val response = call.execute()
            //恢复执行,并返回Response
            continuation.resume(response)
        } catch (e: Exception) {
            //捕获异常,恢复执行,并返回异常
            continuation.resumeWithException(e)
        }
    }.let { response ->
        //通过flow发射Response
        emit(response)
    }
}

AsyncBodyFlowCallAdapter&&AsyncResponseFlowCallAdapter

AsyncBodyFlowCallAdapter 和 AsyncResponseFlowCallAdapter是异步版的FlowCallAdapter,这里的异步指的是在实际调用Call时,调用的是call.enqueue方法,以AsyncBodyFlowCallAdapter为例与同步版做下对比:

fun <R> asyncBodyFlow(call: Call<R>): Flow<R> = flow {
    try {
        suspendCancellableCoroutine<R> { continuation ->
            //协程取消时,调用Call.cancel()取消call
            continuation.invokeOnCancellation {
                call.cancel()
            }
            //调用call.enqueue(),在callback里恢复执行并返回结果
            call.enqueue(object : Callback<R> {
                override fun onResponse(call: Call<R>, response: Response<R>) {
                    if (response.isSuccessful) {
                        //http响应[200..300),恢复执行并返回响应体
                        continuation.resume(response.body()!!)
                    } else {
                        //其他http响应,恢复执行并抛出http异常
                        continuation.resumeWithException(HttpException(response))
                    }
                }

                override fun onFailure(call: Call<R>, t: Throwable) {
                    //其他捕获的异常,恢复执行并抛出该异常
                    continuation.resumeWithException(t)
                }
            })
        }.let { responseBody->
            //通过flow发射响应体
            emit(responseBody)
        }
    } catch (e: Exception) {
        suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
            Dispatchers.Default.dispatch(continuation.context) {
                //特殊case处理,确保抛出异常前挂起,感兴趣可参考https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
                continuation.intercepted().resumeWithException(e)
            }
            COROUTINE_SUSPENDED
        }
    }
}

最后写一个工厂类

class FlowCallAdapterFactory private constructor(private val async: Boolean) :
    CallAdapter.Factory() {
    //returnType,代表api接口方法的返回值类型,annotations为该接口方法的注解
    //根据参数返回该接口方法的CallAdapter
    override fun get(
        returnType: Type,
        annotations: Array<out Annotation>,
        retrofit: Retrofit,
    ): CallAdapter<*, *>? {
        //如果返回值原始类型不是Flow类型,直接返回null,表示不做处理
        if (getRawType(returnType) != Flow::class.java) return null
        //强制返回值类型为Flow<R>,而不是Flow
        if (returnType !is ParameterizedType) {
            throw IllegalStateException("the flow type must be parameterized as Flow<Foo>!")
        }
        //获取Flow的泛型参数
        val flowableType = getParameterUpperBound(0, returnType)
        //获取Flow的泛型参数的原始类型
        val rawFlowableType = getRawType(flowableType)

        return if (rawFlowableType == Response::class.java) {
            //Flow<T>中的T为retrofit2.Response,但不是泛型Response<R>模式
            if (flowableType !is ParameterizedType) {
                throw IllegalStateException("the response type must be parameterized as Response<Foo>!")
            }
            //选取Response的泛型参数作为ResponseBody,创建Flow<Response<R>>模式的FlowCallAdapter
            val responseBodyType = getParameterUpperBound(0, flowableType)
            createResponseFlowCallAdapter(async, responseBodyType)
        } else {
            //直接将Flow的泛型参数作为ResponseBody,创建Flow<R>模式的FlowCallAdapter
            createBodyFlowCallAdapter(async, flowableType)
        }
    }

    companion object {
        //获取工厂实例的方法
        //async表示是异步调用Call还是同步调用Call,默认false,即同步调用,
        //同步调用则由协程上下文决定Call.execute()的执行线程
        //若为true,则协程只调用Call.enqueue()方法,网络请求在okhttp的线程池里执行
        @JvmStatic
        fun create(async: Boolean = false) = FlowCallAdapterFactory(async)
    }
}

几个场景

简单列表数据的加载状态

简单的列表显示场景,可以使用onStartonEmptycatchonCompletion等回调操作符,监听数据流的状态,显示相应的加载状态UI。

  • onStart:在数据发射之前触发,onStart所在的线程,是数据产生的线程
  • onCompletion:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程
  • onEmpty:当数据流结束了,缺没有发出任何元素的时候触发。
  • catch:数据流发生错误的时候触发
  • flowOn:指定上游数据流的CoroutineContext,下游数据流不会受到影响
private fun coldFlowDemo() {
    //创建一个冷流,在3秒后发射一个数据
    val coldFlow = flow<Int> {
        delay(3000)
        emit(1)
    }
    lifecycleScope.launch(Dispatchers.IO) {
        coldFlow.onStart {
            Log.d(TAG, "coldFlow onStart, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = true
            mBinding.tvLoadingStatus.text = "加载中"
        }.onEmpty {
            Log.d(TAG, "coldFlow onEmpty, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载为空"
        }.catch {
            Log.d(TAG, "coldFlow catch, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "数据加载错误:$it"
        }.onCompletion {
            Log.d(TAG, "coldFlow onCompletion, thread:${Thread.currentThread().name}")
            mBinding.progressBar.isVisible = false
            mBinding.tvLoadingStatus.text = "加载完成"
        }
            //指定上游数据流的CoroutineContext,下游数据流不会受到影响
            .flowOn(Dispatchers.Main)
            .collect {
                Log.d(TAG, "coldFlow collect:$it, thread:${Thread.currentThread().name}")
            }
    }
}
coldFlow onStart, thread:main
coldFlow onCompletion, thread:main
coldFlow collect:1, thread:DefaultDispatcher-worker-1

同一种数据,需要加载本地数据和网络数据

在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。

但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。

  • 本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
  • 网络数据比本地数据先加载完成:
    • 网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
    • 网络数据加载失败,可以继续尝试使用本地数据进行兜底。
  • 本地数据和网络数据都加载失败:通知上层数据加载失败
abstract class CacheRepositity<T> {
    private val TAG = "CacheRepositity"

    fun getData() = channelFlow<CResult<T>> {
        supervisorScope {
            val dataFromLocalDeffer = async {
                fetchDataFromLocal().also {
                    Log.d(TAG,"fetchDataFromLocal result:$it , thread:${Thread.currentThread().name}")
                    //本地数据加载成功  
                    if (it is CResult.Success) {
                        send(it)
                    }
                }
            }

            val dataFromNetDeffer = async {
                fetchDataFromNetWork().also {
                    Log.d(TAG,"fetchDataFromNetWork result:$it , thread:${Thread.currentThread().name}")
                    //网络数据加载成功  
                    if (it is CResult.Success) {
                        send(it)
                        //如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
                        dataFromLocalDeffer.cancel()
                    }
                }
            }

            //本地数据和网络数据,都加载失败的情况
            val localData = dataFromLocalDeffer.await()
            val networkData = dataFromNetDeffer.await()
            if (localData is CResult.Error && networkData is CResult.Error) {
                send(CResult.Error(Throwable("load data error")))
            }
        }
    }

    protected abstract suspend fun fetchDataFromLocal(): CResult<T>

    protected abstract suspend fun fetchDataFromNetWork(): CResult<T>

}

sealed class CResult<out R> {
    data class Success<out T>(val data: T) : CResult<T>()
    data class Error(val throwable: Throwable) : CResult<Nothing>()
}

测试案例:

private fun cacheRepositityDemo(){
    val repositity=TestRepositity()
    lifecycleScope.launch {
        repositity.getData().onStart {
            Log.d(TAG, "TestRepositity: onStart")
        }.onCompletion {
            Log.d(TAG, "TestRepositity: onCompletion")
        }.collect {
            Log.d(TAG, "collect: $it")
        }
    }
}

本地数据比网络数据加载快

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(1000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromNetWork")
    }
}

onStart
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据比本地数据加载快

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Success("data from fetchDataFromNetWork")
    }
}


onStart
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion

网络数据加载失败,使用本地数据

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Success("data from fetchDataFromLocal")
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    }
}

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
onCompletion

网络数据和本地数据都加载失败

class TestRepositity : CacheRepositity<String>() {
    override suspend fun fetchDataFromLocal(): CResult<String> {
        delay(2000)
        return CResult.Error(Throwable("fetchDataFromLocal Error"))
    }

    override suspend fun fetchDataFromNetWork(): CResult<String> {
        delay(1000)
        return CResult.Error(Throwable("fetchDataFromNetWork Error"))
    }
}

onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
collect: Error(throwable=java.lang.Throwable: load data error)
onCompletion

多种数据源,合并后展示

在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。

实现目标:

  • 接口间不需要互相等待,哪些数据先回来,就先展示哪部分
  • 控制数据的显示顺序

可以借助combine合并flow

可以合并多个不同的 Flow 数据流,生成一个新的流。只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。

class HomeViewModel : ViewModel() {

    //暴露给View层的列表数据
    val list = MutableLiveData<List<String?>>()

    //多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
    private val bannerFlow = MutableStateFlow<String?>(null)
    private val channelFlow = MutableStateFlow<String?>(null)
    private val listFlow = MutableStateFlow<String?>(null)


    init {
        //使用combine操作符
        viewModelScope.launch {
            combine(bannerFlow, channelFlow, listFlow) { bannerData, channelData, listData ->
                Log.d("HomeViewModel", "combine  bannerData:$bannerData,channelData:$channelData,listData:$listData")
                //只要子flow里面的数据不为空,就放到resultList里面
                val resultList = mutableListOf<String?>()
                if (bannerData != null) {
                    resultList.add(bannerData)
                }
                if (channelData != null) {
                    resultList.add(channelData)
                }
                if (listData != null) {
                    resultList.add(listData)
                }
                resultList
            }.collect {
                //收集combine之后的数据,修改liveData的值,通知UI层刷新列表
                Log.d("HomeViewModel", "collect: ${it.size}")
                list.postValue(it)
            }
        }
    }

    fun loadData() {
        viewModelScope.launch(Dispatchers.IO) {
            //模拟耗时操作
            async {
                delay(1000)
                Log.d("HomeViewModel", "getBannerData success")
                bannerFlow.emit("Banner")
            }
            async {
                delay(2000)
                Log.d("HomeViewModel", "getChannelData success")
                channelFlow.emit("Channel")
            }
            async {
                delay(3000)
                Log.d("HomeViewModel", "getListData success")
                listFlow.emit("List")
            }
        }
    }
}

HomeViewModel

  • 提供一个 LiveData 的列表数据给View层使用
  • 内部有3个子 flow ,分别负责相应数据的生产。(这里简单都返回String,实际场景根据需要,返回相应的数据类型即可)。
  • 通过 combine 操作符,组合这3个子flow的数据。
  • collect 接收生成的新数据,并修改liveData的数据,通知刷新UI

View层使用

private fun flowCombineDemo() {
    val homeViewModel by viewModels<HomeViewModel>()
    homeViewModel.list.observe(this) {
        Log.d("HomeViewModel", "observe size:${it.size}")
    }
    homeViewModel.loadData()
}

简单的创建一个 ViewModelobserve 列表数据对应的LiveData

通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。

日志输出:

combine  bannerData:null,channelData:null,listData:null
collect: 0
observe size:0

getBannerData success
combine  bannerData:Banner,channelData:null,listData:null
collect: 1
observe size:1

getChannelData success
combine  bannerData:Banner,channelData:Channel,listData:null
collect: 2
observe size:2

getListData success
combine  bannerData:Banner,channelData:Channel,listData:List
collect: 3
observe size:3

Ref

用Kotlin Flow无缝衔接Retrofit (qq.com)

Kotlin flow实践总结! (qq.com)

Asynchronous Flow | Kotlin (kotlinlang.org)

评论
发表评论
       
       
取消