Livedata源码解析
Observer添加
使用 LiveData
时,首先要做的,就是添加一个 Observer<T>
viewModel.activeMsg.observe(viewLifecycleOwner) {
data = it as ArrayList<ActiveMsgBean>
adapter.refreshAllData(data)
}
我们来看看livedata的observe方法
/**
* Adds the given observer to the observers list within the lifespan of the given
* owner. The events are dispatched on the main thread. If LiveData already has data
* set, it will be delivered to the observer.
* <p>
* The observer will only receive events if the owner is in {@link Lifecycle.State#STARTED}
* or {@link Lifecycle.State#RESUMED} state (active).
* <p>
* If the owner moves to the {@link Lifecycle.State#DESTROYED} state, the observer will
* automatically be removed.
* <p>
* When data changes while the {@code owner} is not active, it will not receive any updates.
* If it becomes active again, it will receive the last available data automatically.
* <p>
* LiveData keeps a strong reference to the observer and the owner as long as the
* given LifecycleOwner is not destroyed. When it is destroyed, LiveData removes references to
* the observer & the owner.
* <p>
* If the given owner is already in {@link Lifecycle.State#DESTROYED} state, LiveData
* ignores the call.
* <p>
* If the given owner, observer tuple is already in the list, the call is ignored.
* If the observer is already in the list with another owner, LiveData throws an
* {@link IllegalArgumentException}.
*
* @param owner The LifecycleOwner which controls the observer
* @param observer The observer that will receive the events
*/
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
// ignore
return;
}
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
owner.getLifecycle().addObserver(wrapper);
}
通常我们是通过这种方式添加observer,这种方式添加的observer只有 onStart 后,对数据的修改才会触发 observer.onChanged()
还有一个方法observeForever
/**
* Adds the given observer to the observers list. This call is similar to
* {@link LiveData#observe(LifecycleOwner, Observer)} with a LifecycleOwner, which
* is always active. This means that the given observer will receive all events and will never
* be automatically removed. You should manually call {@link #removeObserver(Observer)} to stop
* observing this LiveData.
* While LiveData has one of such observers, it will be considered
* as active.
* <p>
* If the observer was already added with an owner to this LiveData, LiveData throws an
* {@link IllegalArgumentException}.
*
* @param observer The observer that will receive the events
*/
@MainThread
public void observeForever(@NonNull Observer<? super T> observer) {
assertMainThread("observeForever");
AlwaysActiveObserver wrapper = new AlwaysActiveObserver(observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
//不能重复添加
if (existing instanceof LiveData.LifecycleBoundObserver) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
wrapper.activeStateChanged(true);
}
无论何时,只要数据发生改变,就会触发 observer.onChanged(),并且观察者不会被自动移除,这有可能导致内存泄漏,我们一定要自己注意手动管理
我们来看看observe里做了什么
首先如果activity 已经 destroy,就会直接返回,然后新建了一个LifecycleBoundObserver把我们的observer和lifecycleowner给保存了起来,其实很有意思的是,这里的livedata它不仅是owner的observer,同时也是我们自定义的observer的observable
看看LifecycleBoundObserver的实现
public abstract class LiveData<T> {
// 空实现,如果在 LiveData 变为 inactive 状态后想停止更新数据,可以
// override 这两个方法
protected void onActive() {}
protected void onInactive() {}
private abstract class ObserverWrapper {
final Observer<T> mObserver;
boolean mActive;
int mLastVersion = START_VERSION;
ObserverWrapper(Observer<T> observer) {
mObserver = observer;
}
// 如果 observer 处于 active 状态,则返回 true
abstract boolean shouldBeActive();
boolean isAttachedTo(LifecycleOwner owner) {
return false;
}
void detachObserver() {
}
void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// immediately set active state, so we'd never dispatch anything to inactive
// owner
mActive = newActive;
// LiveData.this.mActiveCount 表示处于 active 状态的 observer 的数量
// 当 mActiveCount 大于 0 时,`LiveData` 处于 active 状态
// 注意区分 observer 的 active 状态和 LiveData 的 active 状态
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
if (wasInactive && mActive) {
// inactive -> active
onActive();
}
// 这里用 else if 比较好,因为只有一个会执行。else if 更易读
if (LiveData.this.mActiveCount == 0 && !mActive) {
// mActiveCount 在我们修改前等于 1,也就是说,`LiveData` 从 active
// 状态变到了 inactive
onInactive();
}
if (mActive) {
dispatchingValue(this);
}
}
}
class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
@NonNull final LifecycleOwner mOwner;
LifecycleBoundObserver(@NonNull LifecycleOwner owner, Observer<T> observer) {
super(observer);
mOwner = owner;
}
@Override
boolean shouldBeActive() {
// onStart 到 onStop 之间则认为是 active 状态
return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED);
}
// 这个是 lifecycle 的回调函数
@Override
public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return;
}
activeStateChanged(shouldBeActive());
}
@Override
boolean isAttachedTo(LifecycleOwner owner) {
return mOwner == owner;
}
@Override
void detachObserver() {
mOwner.getLifecycle().removeObserver(this);
}
}
}
数据发布
无非俩种,同步的和异步的
public abstract class LiveData<T> {
// 同步修改数据
protected void setValue(T value);
// 会用 Handler post 一个 runnable,然后在 runnable 里面 setValue
protected void postValue(T value);
}
我们先看setValue
public abstract class LiveData<T> {
@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
// 每次更新 value,都会使 mVersion + 1
// ObserverWrapper 也有一个字段,叫 mLastVersion
// 通过比较这两个字段大小来判断是否要给观察者分发数据
mVersion++;
mData = value;
dispatchingValue(null);
}
// 如果 initiator == null,表示要通知所有的 observer
// 不等于 null 则指定分发给单个数据观察者
private void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
// 在 observer 的回调里面又触发了数据的修改
// 设置 mDispatchInvalidated 为 true 后,可以让下面的循环知道
// 数据被修改了,从而开始一轮新的迭代。
//
// 比方说,dispatchingValue -> observer.onChanged -> setValue
// -> dispatchingValue
// 这里 return 的是后面那个 dispatchingValue,然后在第一个
// dispatchingValue 会重新遍历所有的 observer,并调用他们的
// onChanged。
//
// 如果想避免这种情况,可以在回调里面使用 postValue 来更新数据
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
// 调用 observer.onChanged()
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
// 某个客户在回调里面更新了数据,break 后,这个 for 循环会
// 重新开始
break;
}
}
}
// 当某个客户在回调里面更新了数据,mDispatchInvalidated == true
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
}
下面是 considerNotify
:
public abstract class LiveData<T> {
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
// Check latest state b4 dispatch. Maybe it changed state but we didn't get the event yet.
//
// we still first check observer.active to keep it as the entrance for events. So even if
// the observer moved to an active state, if we've not received that event, we better not
// notify for a more predictable notification order.
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
// 对于 LifecycleBoundObserver 来说,即使 `LiveData` 的数据没有变化,只要 activity 的生命
// 周期发生了改变,还是可能会调用 considerNotify 多次
// 通过比较 observer.mLastVersion 和 mVersion,就能够知道 observer 是否已经拥有了最新的数据
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
}
分发值有两种情况:“分发给单个观察者”和“分发给所有观察者”。当 LiveData 值更新时,需分发给所有观察者。
所有的观察者被存在一个 Map 结构中,分发的方式是通过遍历 Map 并逐个调用considerNotify()。在这个方法中需要跨过三道坎,才能真正地将值分发给数据观察者,分别是:
- 数据观察者是否活跃。
- 数据观察者绑定的生命周期组件是否活跃。
- 数据观察者的版本号是否是最新的。
跨过三道坎后,会将最新的版本号存储在观察者的 mLastVersion 字段中,即版本号除了保存在LiveData.mVersion,还会在每个观察者中保存一个副本mLastVersion,最后才将之前暂存的mData的值分发给数据观察者。
每个数据观察者都和一个组件的生命周期对象绑定,当组件生命周期发生变化时,会尝试将最新值分发给该数据观察者。而与观察者绑定组件的生命周期发生变化时,将最新的值分发给指定观察者。
也就是说
总结一下,LiveData 有两次机会通知观察者,与之对应的有两种分发值的方式:
- 当值更新时,遍历所有观察者将最新值分发给它们。
- 当与观察者绑定组件的生命周期发生变化时,将最新的值分发给指定观察者。
每一个数据观察者都会被包装(见第一节),包装类型为ObserverWrapper,观察者的包装类型通过组合的方式持有了一个原始观察者,并在此基础上为其扩展了活跃状态和版本号的概念,而其是否活跃由子类定义
// 原始数据观察者
public interface Observer<T> {
void onChanged(T t);
}
// 观察者包装类型
private abstract class ObserverWrapper {
// 持有原始数据观察者
final Observer<? super T> mObserver;
// 当前观察者是否活跃
boolean mActive;
// 当前观察者最新值版本号,初始值为 -1
int mLastVersion = START_VERSION;
// 注入原始观察者
ObserverWrapper(Observer<? super T> observer) {mObserver = observer;}
// 当数据观察者绑定的组件生命周期变化时,尝试将最新值分发给当前观察者
void activeStateChanged(boolean newActive) {
// 若观察者活跃状态未变,则不分发值
if (newActive == mActive) {
return;
}
// 更新活跃状态
mActive = newActive;
// 若活跃,则将最新值分发给当前观察者
if (mActive) {
dispatchingValue(this);
}
}
// 是否活跃,供子类重写
abstract boolean shouldBeActive();
}
class LifecycleBoundObserver extends ObserverWrapper implements LifecycleEventObserver {
final LifecycleOwner mOwner;
LifecycleBoundObserver(LifecycleOwner owner, Observer<? super T> observer) {
super(observer);
mOwner = owner;
}
// 当与观察者绑定的生命周期组件至少为STARTED时,表示观察者活跃
@Override
boolean shouldBeActive() {
return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED);
}
@Override
public void onStateChanged( LifecycleOwner source, Lifecycle.Event event) {
Lifecycle.State currentState = mOwner.getLifecycle().getCurrentState();
// 当生命周期状态发生变化,则尝试将最新值分发给数据观察者
while (prevState != currentState) {
prevState = currentState;
// 调用父类方法,进行分发
activeStateChanged(shouldBeActive());
currentState = mOwner.getLifecycle().getCurrentState();
}
}
}
再来看看postvalue
public abstract class LiveData<T> {
// 注意,他是 volatile。因为 postValue 可以从后台线程调用,
private volatile Object mPendingData = NOT_SET;
private final Runnable mPostValueRunnable = new Runnable() {
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
// 同步地获取暂存值
newValue = mPendingData;
mPendingData = NOT_SET;
}
//noinspection unchecked
setValue((T) newValue);
}
};
protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
postTask = mPendingData == NOT_SET;
//暂存值
mPendingData = value;
}
if (!postTask) {
// 已经有一个 post 后还没有执行的 runnable,所以就不需要再 post 了,
// 前面 post 的 runnable 执行时,会拿到这个新设置的 value
return;
}
// 最终执行的就是 handler.post()
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
}
其中postValue频数据更新的场景下可能会丢失数据,因为“设值”和“分发值”是分开执行的,之间存在延迟。值先被缓存在变量中,再向主线程抛一个分发值的任务。若在这延迟之间再一次调用 postValue(),则变量中缓存的值被更新,之前的值在没有被分发之前就被擦除了。
关于LiveData的stiky事件
众所周知,Livedata是粘性的,在livedata中会用一个mData字段保存最新一个setValue或者postValue的值,在有新的观察者注册时就会也会触发一次生命周期变化
owner.getLifecycle().addObserver(wrapper);
// androidx.lifecycle.LifecycleRegistry
public void addObserver(@NonNull LifecycleObserver observer) {
State initialState = mState == DESTROYED ? DESTROYED : INITIALIZED;
ObserverWithState statefulObserver = new ObserverWithState(observer, initialState);
...
// 将生命周期事件分发给新进的观察者
statefulObserver.dispatchEvent(lifecycleOwner, upEvent(statefulObserver.mState));
...
}
// LifecycleBoundObserver 又被包了一层
static class ObserverWithState {
State mState;
GenericLifecycleObserver mLifecycleObserver;
ObserverWithState(LifecycleObserver observer, State initialState) {
mLifecycleObserver = Lifecycling.getCallback(observer);
mState = initialState;
}
void dispatchEvent(LifecycleOwner owner, Event event) {
State newState = getStateAfter(event);
mState = min(mState, newState);
// 分发生命周期事件给 LifecycleBoundObserver
mLifecycleObserver.onStateChanged(owner, event);
mState = newState;
}
}
这种“新观察者”会被“老值”通知的现象称为粘性。
问题
而粘性事件可能导致某些问题:
购物车-结算场景:假设有一个购物车界面,点击结算后跳转到结算界面,结算界面可以回退到购物车界面。这两个界面都是 Fragment,这俩个界面都是通过共享ViewModel的方式共享商品列表
想象这样一种操作,如果我们的livedata的注册观察者的行为写在onViewCreated,当我们从购物车界面进入到结算界面后再返回购物车界面,这样就会导致livedata被重复注册,当购物车页面重新展示时,onViewCreated()会再次执行
解决方案一:带消费记录的值
// 一次性值
open class OneShotValue<out T>(private val value: T) {
// 值是否被消费
private var handled = false
// 获取值,如果值未被处理则返回,否则返回空
fun getValue(): T? {
return if (handled) {
null
} else {
handled = true
value
}
}
// 获取上次被处理的值
fun peekValue(): T = value
}
class MyViewModel:ViewModel() {
// 已选物品列表
val selectsListLiveData = MutableLiveData<OneShotValue<List<String>>>()
// 更新已选物品
fun setSelectsList(goods:List<String>){
selectsListLiveData.value = OneShotValue(goods)
}
}
在值的外面套一层,新增一个标记位标识是否被处理过。
重复弹 toast 的问题是解决了,但引出了一个新的问题:当购物车满弹出 toast 时,购物车列表已经被消费掉了,导致结算界面就无法再消费了。
这时候只能用OneShotValue的peekValue()来获取已经被消费的值
使用该方案得甄别出哪些观察者需要粘性值,哪些观察者需要非粘性事件。当观察者很多的时候,就很难招架了。若把需要粘性处理和非粘性处理的逻辑写在一个观察者中,就 GG,还得新建观察者将它们分开。
解决方案二:带有最新版本号的观察者
通知观察者前需要跨过三道坎(详见第三节),其中有一道坎是版本号的比对。若新建的观察者版本号小于最新版本号,则表示观察者落后了,需要将最新值分发给它。
LiveData 源码中,新建观察者的版本号总是 -1。
若能够让新建观察者的版本号被最新版本号赋值,那版本号对比的那道坎就过不了,新值就无法分发到新建观察者。
所以得通过反射修改 mLastVersion 字段。
该方案除了倾入性强之外,把 LiveData 粘性彻底破坏了。但有的时候,我们还是想利用粘性的。
解决方案三:SingleLiveEvent
这是谷歌给出的一个解决方案,源码可以点击这里。
public class SingleLiveEvent<T> extends MutableLiveData<T> {
// 标志位,用于表达值是否被消费
private final AtomicBoolean mPending = new AtomicBoolean(false);
public void observe(LifecycleOwner owner, final Observer<T> observer) {
// 中间观察者
super.observe(owner, new Observer<T>() {
@Override
public void onChanged(@Nullable T t) {
// 只有当值未被消费过时,才通知下游观察者
if (mPending.compareAndSet(true, false)) {
observer.onChanged(t);
}
}
});
}
public void setValue(@Nullable T t) {
// 当值更新时,置标志位为 true
mPending.set(true);
super.setValue(t);
}
public void call() {
setValue(null);
}
}
专门设立一个 LiveData,它不具备粘性。它通过新增的“中间观察者”,拦截上游数据变化,然后在转发给下游。拦截之后通常可以做一点手脚,比如增加一个标记位mPending是否消费过的判断,若消费过则不转发给下游。
在数据驱动的 App 界面下,存在两种值:1. 非暂态数据 2. 暂态数据。
demo 中用于提示“购物车已满”的数据就是“暂态数据”,这种数据是一次性的,转瞬即逝的,可以消费一次就扔掉。
demo 中购物车中的商品列表就是“非暂态数据”,它的生命周期要比暂态数据长一点,在购物车界面和结算界面存活的期间都应该能被重复消费。
SingleLiveEvent 的设计正是基于对数据的这种分类方法,即暂态数据使用 SingleLiveEvent,非暂态数据使用常规的 LiveData。
这样尘归尘土归土的解决方案是符合现实情况的
class MyViewModel : ViewModel() {
// 非暂态购物车列表 LiveData
val selectsListLiveData = MutableLiveData<List<String>>()
// 暂态购物车列表 LiveData
val singleListLiveData = SingleLiveEvent<List<String>>()
// 更新购物车列表,同时更新暂态和非暂态
fun setSelectsList(goods: List<String>) {
selectsListLiveData.value = goods
singleListLiveData.value = goods
}
}
但该方案有局限性,若为 SingleLiveEvent 添加多个观察者,则当第一个观察者消费了数据后,其他观察者就没机会消费了。因为mPending是所有观察者共享的。
解决方案也很简单,为每个中间观察者都持有是否消费过数据的标记位:
open class LiveEvent<T> : MediatorLiveData<T>() {
// 持有多个中间观察者
private val observers = ArraySet<ObserverWrapper<in T>>()
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
observers.find { it.observer === observer }?.let { _ ->
return
}
// 构建中间观察者
val wrapper = ObserverWrapper(observer)
observers.add(wrapper)
super.observe(owner, wrapper)
}
@MainThread
override fun observeForever(observer: Observer<in T>) {
observers.find { it.observer === observer }?.let { _ ->
return
}
val wrapper = ObserverWrapper(observer)
observers.add(wrapper)
super.observeForever(wrapper)
}
@MainThread
override fun removeObserver(observer: Observer<in T>) {
if (observer is ObserverWrapper && observers.remove(observer)) {
super.removeObserver(observer)
return
}
val iterator = observers.iterator()
while (iterator.hasNext()) {
val wrapper = iterator.next()
if (wrapper.observer == observer) {
iterator.remove()
super.removeObserver(wrapper)
break
}
}
}
@MainThread
override fun setValue(t: T?) {
// 通知所有中间观察者,有新数据
observers.forEach { it.newValue() }
super.setValue(t)
}
// 中间观察者
private class ObserverWrapper<T>(val observer: Observer<T>) : Observer<T> {
// 标记当前观察者是否消费了数据
private var pending = false
override fun onChanged(t: T?) {
// 保证只向下游观察者分发一次数据
if (pending) {
pending = false
observer.onChanged(t)
}
}
fun newValue() {
pending = true
}
}
}
Questions
LiveData 是如何避免内存泄漏的?
LiveData 的数据观察者通常是匿名内部类,它持有界面的引用,可能造成内存泄漏。
LiveData 内部会将数据观察者进行封装,使其具备生命周期感知能力。当生命周期状态为 DESTROYED 时,自动移除观察者。
LiveData 是粘性的吗?若是,它是怎么做到的?
- LiveData 的值被存储在内部的字段中,直到有更新的值覆盖,所以值是持久的。
- 两种场景下 LiveData 会将存储的值分发给观察者。一是值被更新,此时会遍历所有观察者并分发之。二是新增观察者或观察者生命周期发生变化(至少为 STARTED),此时只会给单个观察者分发值。
- LiveData 的观察者会维护一个“值的版本号”,用于判断上次分发的值是否是最新值。该值的初始值是-1,每次更新 LiveData 值都会让版本号自增。
- LiveData 并不会无条件地将值分发给观察者,在分发之前会经历三道坎:1. 数据观察者是否活跃。2. 数据观察者绑定的生命周期组件是否活跃。3. 数据观察者的版本号是否是最新的。
- “新观察者”被“老值”通知的现象叫“粘性”。因为新观察者的版本号总是小于最新版号,且添加观察者时会触发一次老值的分发。
粘性的 LiveData 会造成什么问题?怎么解决?
造成重复订阅的问题
可以使用SingleLiveEvent
专门设立一个 LiveData,它不具备粘性。它通过新增的“中间观察者”,拦截上游数据变化,然后在转发给下游。拦截之后通常可以做一点手脚,比如增加一个标记位mPending是否消费过的判断,若消费过则不转发给下游。
在数据驱动的 App 界面下,存在两种值:1. 非暂态数据 2. 暂态数据。
什么情况下 LiveData 会丢失数据?
在高频数据更新的场景下使用 LiveData.postValue() 时,会造成数据丢失。因为“设值”和“分发值”是分开执行的,之间存在延迟。值先被缓存在变量中,再向主线程抛一个分发值的任务。若在这延迟之间再一次调用 postValue(),则变量中缓存的值被更新,之前的值在没有被分发之前就被擦除了。
在 Fragment 中使用 LiveData 需注意些什么?
在 Fragment 中观察 LiveData 时使用viewLifecycleOwner而不是this。因为 Fragment 和 其中的 View 生命周期不完全一致。LiveData 内部判定生命周期为 DESTROYED 时,才会移除数据观察者。存在一种情况,当 Fragment 之间切换时,被替换的 Fragment 不执行 onDestroy(),而只会执行到onDestoryView,当它再次展示时会再次订阅 LiveData,于是乎就多出一个订阅者。
Ref
Android arch components 源码分析(3)—— LiveData | Jekton