0

EventBus源码研读

摘要

转载:http://kymjs.com/code/2015/12/12/01
本文总共分三部分,从源码角度分析了 EventBus 库。以及介绍了其内部实现注册、发送、响应、取消注册的原理。

EventBus 是一款针对Android优化的发布/订阅事件总线。主要功能是替代Intent, Handler, BroadCast 在 Fragment,Activity,Service,线程之间传递消息.优点是开销小,使用方便,可以很大程度上降低它们之间的耦合,使得我们的代码更加简洁,耦合性更低,提升我们的代码质量。
类似的库还有 Otto ,今天就带大家一起研读 EventBus 的源码.

在写这篇文章之前,我已经将本文相关的中文注释代码上传到了GitHub:https://github.com/kymjs/EventBus

基础用法

在读代码之前,首先你得了解它的基本用法.如果你已经能够很熟练的使用EventBus等事件总线库了,那么你可以跳过本节.
首先引入依赖包,查看GitHub主页的说明: https://github.com/greenrobot/EventBus
在Gradle文件加入
compile 'de.greenrobot:eventbus:2.4.0'

用法与广播相同,且比广播更简单:

注册订阅者

首先你需要注册一个事件订阅者,为了方便理解你可以把他当成广播的广播接收者 你可以在任何一个类中使用如下代码注册以及解除注册

//把当前类注册为订阅者(接收者)
EventBus.getDefault().register(this);
//解除注册当前类(同广播一样,一定要调用,否则会内存泄露)
EventBus.getDefault().unregister(this);

注册了订阅者以后,我们需要创建一个回调方法onEvent,当我们订阅的事件发送的时候就会回调它

//其实命名不一定必须是onEvent(),但那属于高级用法了,这里我们只说最简单的
public void onEvent(Object event) {}

事件发送

当有了订阅者以后,我们的代码已经可以工作了.但是此时的代码是没有意义的,我们订阅的事件还没有发生. 就像广播需要一个sendBroadcast(),EventBus需要post(event)
你可以在任何一个类中使用如下代码发送事件:

/**
 * 这里的event类型必须和上面我们onEvent()方法的参数类型一致
 * (子父类关系也不行,必须是相同类型,原因我们下面看源码)
 */
EventBus.getDefault().post(event);

至此,EventBus就可以正常工作了.

进入源码世界

入口类EventBus类

我们从使用的流程来,首先看EventBus#getDefault()

public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

只是简单的维护单例,调用构造方法,再看构造方法,调用重载的构造方法,重载的构造方法又需要一个EventBusBuilder对象

public EventBus() {
    this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
}

EventBusBuilder类

看名字就知道,这个类是用来创建EventBus对象的.
开源实验室:图1

Builder类提供了这么多个可选的配置属性,这里变量含义大家直接看我的注释,就不多作解释了
我们主要来看最终的建造方法

/**
 * 根据参数创建对象,并赋值给EventBus.defaultInstance, 必须在默认的eventbus对象使用以前调用
 *
 * @throws EventBusException if there's already a default EventBus instance in place
 */
public EventBus installDefaultEventBus() {
    synchronized (EventBus.class) {
        if (EventBus.defaultInstance != null) {
            throw new EventBusException("Default instance already exists." +
                    " It may be only set once before it's used the first time to ensure " +
                    "consistent behavior.");
        }
        EventBus.defaultInstance = build();
        return EventBus.defaultInstance;
    }
}
/**
 * 根据参数创建对象
 */
public EventBus build() {
    return new EventBus(this);
}

EventBusBuilder类提供了两种建造方法,还记得之前的getDefault()方法吗,维护了一个单例对象,installDefaultEventBus() 方法建造的EventBus对象最终会赋值给那个单例对象,但是有一个前提就是我们之前并没有创建过那个单例对象.
这里大家思考一下,为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?咱们之后说答案。
第二个方法就是默认的建造者方法了.

再回到我们的EventBus构造方法,根据提供的建造者初始化了一大堆属性
图3

我们继续看这些初始化的字段.

三个Poster类

先是一大堆Map,看不懂,跳过去,我们先来看这三个Poster,需要说明的一点就是:Poster只负责处理粘滞事件,原因我们之后看代码。

private final HandlerPoster mainThreadPoster; //前台发送者
private final BackgroundPoster backgroundPoster; //后台发送者
private final AsyncPoster asyncPoster;   //后台发送者(只让队列第一个待订阅者去响应)

其实从类名我们就能看出个大概了,就是三个发送事件的方法。
我们来看看他们的内部实现.
这几个Poster的设计可以说是整个EventBus的一个经典部分,越看越想继续多看几遍.

每个Poster中都有一个发送任务队列,PendingPostQueue queue;

进到队列里面再看 定义了两个节点,从字面上理解就是队列的头节点和尾节点

private PendingPost head; //待发送对象队列头节点
private PendingPost tail;//待发送对象队列尾节点

再看这个PendingPost类的实现:

//单例池,复用对象
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event; //事件类型
Subscription subscription; //订阅者
PendingPost next; //队列下一个待发送对象

首先是提供了一个的设计,类似于我们的线程池,目的是为了减少对象创建的开销,当一个对象不用了,我们可以留着它,下次再需要的时候返回这个保留的而不是再去创建。
再看最后的变量,PendingPost next 非常典型的队列设计,队列中每个节点都有一个指向下一个节点的指针(sorry,数据结构用C学的)。

/**
 * 首先检查复用池中是否有可用,如果有则返回复用,否则返回一个新的
 *
 * @param subscription 订阅者
 * @param event        订阅事件
 * @return 待发送对象
 */
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    return new PendingPost(event, subscription);
}
/**
 * 回收一个待发送对象,并加入复用池
 *
 * @param pendingPost 待回收的待发送对象
 */
static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // 防止池无限增长
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}

obtainPendingPost(),对池复用的实现,每次新创建的节点尾指针都为 null 。
releasePendingPost(),回收pendingPost对象,既然有从池中取,当然需要有存。这里,原作非常细心的加了一次判断,if (pendingPostPool.size() < 10000) 其实我觉得10000都很大了,1000就够了,我们一次只可能创建一个pendingPost,如果ArrayList里面存了上千条都没有取走,那么肯定是使用出错了。

PendingPost的代码我们就看完了,再回到上一级,队列的设计:

接着是PendingPostQueue的入队方法

synchronized void enqueue(PendingPost pendingPost) {
	...
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    }
    ...
}

首先将当前节点的上一个节点(入队前整个队列的最后一个节点)的尾指针指向当期正在入队的节点(传入的参数pendingPost),并将队列的尾指针指向自己(自己变成队列的最后一个节点),这样就完成了入队。
如果是队列的第一个元素(队列之前是空的),那么直接将队列的头尾两个指针都指向自身就行了。
出队也是类似的队列指针操作

synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

首先将出队前的头节点保留一个临时变量(它就是要出队的节点),拿到这个将要出队的临时变量的下一个节点指针,将出队前的第二个元素(出队后的第一个元素)的赋值为现在队列的头节点,出队完成。
值得提一点的就是,PendingPostQueue的所有方法都声明了synchronized,这意味着在多线程下它依旧可以正常工作,细想想这也是必须的,对吗?

再回到上一级,接着是HandlerPoster的入队方法enqueue(),

/**
 * @param subscription 订阅者
 * @param event        订阅事件
 */
void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

入队方法会根据参数创建 待发送对象 pendingPost 并加入队列,如果此时 handleMessage() 没有在运行中,则发送一条空消息让 handleMessage 响应
接着是handleMessage()方法

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // 双重校验,类似单例中的实现
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //如果订阅者没有取消注册,则分发消息
            eventBus.invokeSubscriber(pendingPost);
            //如果在一定时间内仍然没有发完队列中所有的待发送者,则退出
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

handleMessage()不停的在待发送队列queue中去取消息。 需要说明的是在循环之外有个临时boolean变量rescheduled,最后是通过这个值去修改了handlerActive。而 handlerActive 是用来判断当前queue中是否有正在发送对象的任务,看到上面的入队方法enqueue(),如果已经有任务在跑着了,就不需要再去sendMessage()唤起我们的handleMessage()

最终通过eventBus对象的invokeSubscriber()最终发送出去,并回收这个pendingPost,让注册了的订阅者去响应(相当于回调),至于这个发送方法,我们之后再看。

看完了HandlePoster类,另外两个异步的发送者实现代码也差不多,唯一的区别就是另外两个是工作在异步,实现的Runnable接口,大家自己类比,这里就不帖代码了.

Poster工作原理

最后我们再来回顾一下PosterPendingPostQueuePendingPost这三个类,再看看下面这张图,是不是有种似曾相识的感觉。

开源实验室:图4

啊哈,那是HandleMessageLooper的工作原理,再看看Poster的
开源实验室:图5

至此,整个EventBus源码的发送接收核心部分已经分析完了。
还记得上面我们留下的那几个问题吗:
1、为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?
2、Poster只对粘滞事件有效的说明代码在哪。
3、invokeSubscriber()最终的发送怎么实现的。
接下来我们继续分析它的注册流程以及粘滞事件的设计(那又是一个经典的地方)。

 

Subscribe流程

我们继续来看EventBus类,分析完了包含的属性,接下来我们看入口方法register()

通过查看源码我们发现,所有的register()方法,最后都会直接或者间接的调用register()方法

/**
 * @param subscriber 订阅者对象
 * @param sticky     是否粘滞
 * @param priority   优先级
 */
private synchronized void register(Object subscriber, boolean sticky, int priority) {
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods
            (subscriber.getClass());
    for (SubscriberMethod subscriberMethod : subscriberMethods) {
        subscribe(subscriber, subscriberMethod, sticky, priority);
    }
}

SubscriberMethod类

出现了一个SubscriberMethod类,看看它是干嘛的:

看字面意思是订阅者方法,看看类中的内容,除了复写的equals()和hashCode()就只有这些了。

final Method method; //方法名
final ThreadMode threadMode; //工作在哪个线程
final Class<?> eventType; //参数类型
/** Used for efficient comparison */
String methodString;
private synchronized void checkMethodString() {
    if (methodString == null) {
        // Method.toString has more overhead, just take relevant parts of the method
        StringBuilder builder = new StringBuilder(64);
        builder.append(method.getDeclaringClass().getName());
        builder.append('#').append(method.getName());
        builder.append('(').append(eventType.getName());
        methodString = builder.toString();
    }
}

ThreadMode是一个枚举类,是不是应该换成 int 更好呢。 checkMethodString()方法就是为了设置变量 methodString 的值,这里new了一个StringBuilder,然后又调用了toString()返回,是不是应该改成直接new String(format...)更好呢?
OK,不管那些细节,看到这里就知道,其实这个类也就是一个封装了的方法名而已。

回到EventBus#register()咱们继续. 噢,又遇到了SubscriberMethodFinder这又是啥,继续去看。

SubscriberMethodFinder类

从字面理解,就是订阅者方法发现者。
回想一下,我们之前用 EventBus 的时候,需要在注册方法传的那个 this 对象里面写一个 onEvent() 方法。没错,SubscriberMethodFinder类就是查看传进去的那个 this 对象里面有没有onEvent()方法的。怎么做到的?当然是反射。而且这个类用了大量的反射去查找类中方法名。

先看他的变量声明

private static final String ON_EVENT_METHOD_NAME = "onEvent";
/**
 * 在较新的类文件,编译器可能会添加方法。那些被称为BRIDGE或SYNTHETIC方法。
 * EventBus必须忽略两者。有修饰符没有公开,但在Java类文件中有格式定义
 */
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
//需要忽略的修饰符
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE |
        SYNTHETIC;
//key:类名,value:该类中需要相应的方法集合
private static final Map<String, List<SubscriberMethod>> methodCache = new HashMap<String,
        List<SubscriberMethod>>();
//跳过校验方法的类(即通过构造函数传入的集合)
private final Map<Class<?>, Class<?>> skipMethodVerificationForClasses;

有一句注释

In newer class files, compilers may add methods. Those are called bridge or synthetic methods. EventBus must ignore both. There modifiers are not public but defined in the Java class file format: http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.6-200-A.1

翻译过来大概就是说java编译器在编译的时候,会额外添加一些修饰符,然后这些修饰符为了效率应该是被忽略的。

还有一个skipMethodVerificationForClasses,看到注释是需要跳过被校验方法的类,校验方法是什么?看看他是干什么的。findSubscriberMethods()方法有点长,咱们抽一点看。 跳过上面的那些临时变量,从while循环里开始看:

Method[] methods = clazz.getDeclaredMethods();
for (Method method : methods) {
    String methodName = method.getName();
    if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
        int modifiers = method.getModifiers();//方法的修饰符
        //如果是public,且 不是之前定义要忽略的类型
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            //。。。先不看
        }
    }
}
clazz = clazz.getSuperclass();

首先是反射获取到 clazz 的全部方法 methods。
通过对全部的方法遍历,为了效率首先做一次筛选,只关注我们的以 “onEvent” 开头的方法。(现在知道之前在基础用法中我说:其实命名不一定必须是onEvent()的原因了吧,因为只要是onEvent开头的就可以了。)
忽略private类型的,最后如果是公有,并且不是 java编译器 生成的方法名,那么就是我们要的了。

再来看拿到要的方法后是怎么处理的

Class<?>[] parameterTypes = method.getParameterTypes();
//如果只有一个参数
if (parameterTypes.length == 1) {
    String modifierString = methodName.substring(ON_EVENT_METHOD_NAME
            .length());
    ThreadMode threadMode;
    if (modifierString.length() == 0) {
        threadMode = ThreadMode.PostThread;
    } else if (modifierString.equals("MainThread")) {
        threadMode = ThreadMode.MainThread;
    } else if (modifierString.equals("BackgroundThread")) {
        threadMode = ThreadMode.BackgroundThread;
    } else if (modifierString.equals("Async")) {
        threadMode = ThreadMode.Async;
    } else {
        if (skipMethodVerificationForClasses.containsKey(clazz)) {
            continue;
        } else {
            throw new EventBusException("Illegal onEvent method, check " +
                    "for typos: " + method);
        }
    }
    Class<?> eventType = parameterTypes[0];
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(methodName);
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    if (eventTypesFound.add(methodKey)) {
        // 方法名,工作在哪个线程,事件类型
        subscriberMethods.add(new SubscriberMethod(method, threadMode,
                eventType));
    }
}

还是反射,拿到这个方法的全部参数集合,如果是只有一个参数,再去根据不同的方法名赋予不同的线程模式(其实也就是最后响应的方法是工作在哪个线程)。
这里我们看到,其实EventBus不仅仅支持onEvent()的回调,它还支持onEventMainThread()onEventBackgroundThread()onEventAsync()这三个方法的回调。
一直到最后,我们看到这个方法把所有的方法名集合作为value,类名作为key存入了 methodCache 这个全局静态变量中。意味着,整个库在运行期间所有遍历的方法都会存在这个 map 中,而不必每次都去做耗时的反射取方法了。

synchronized (methodCache) {
    methodCache.put(key, subscriberMethods);
}
return subscriberMethods;

看了这么久,我们再回到 EventBus#register() 方法。这回可以看懂了,就是拿到指定类名的全部订阅方法(以 onEvent 开头的方法),并对每一个方法调用subscribe()。那么再看subscribe()方法。

事件的处理与发送subscribe()

subscribe()方法接受四个参数,分别为:订阅者封装的对象、响应方法名封装的对象、是否为粘滞事件(可理解为广播)、这条事件的优先级。

//根据传入的响应方法名获取到响应事件(参数类型)
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
//通过响应事件作为key,并取得这个事件类型将会响应的全部订阅者
//没个订阅者至少会订阅一个事件,多个订阅者可能订阅同一个事件(多对多)
//key:订阅的事件,value:订阅这个事件的所有订阅者集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
//根据优先级插入到订阅者集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
    if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
        subscriptions.add(i, newSubscription);
        break;
    }
}
//当前订阅者订阅了哪些事件
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
    subscribedEvents = new ArrayList<Class<?>>();
    typesBySubscriber.put(subscriber, subscribedEvents);
}
//key:订阅者对象,value:这个订阅者订阅的事件集合
subscribedEvents.add(eventType);

跳过一些初始化的局部变量(逻辑看注释就够了)
如果传入的事件是有优先级之分的,则会根据优先级,将事件插入所有订阅了事件eventType的类的集合subscriptions中去。看逻辑我们发现,这里并没有对优先级的大小做限制,默认的优先级是0,priority越大,优先级越高。
每个订阅者是可以有多个重载的onEvent()方法的,所以这里多做了一步,将所有订阅者的响应方法保存到subscribedEvents中。
至此,我们就知道了 EventBus 中那几个map的全部含义。同时也回答了上一篇中问的为什么如果EventBus.defaultInstance不为null以后程序要抛出异常,就是因为这几个 map 不同了。 map 变了以后,订阅的事件就全部变为另一个 EventBus 对象的了,就没办法响应之前那个 EventBus 对象的订阅方法了。

最后又是一个感叹:子事件也可以让响应父事件的 onEvent() 。这个有点绕,举个例子,订阅者的onEvent(CharSequence),如果传一个String类型的值进去,默认情况下是不会响应的,但如果我们在构建的时候设置了 eventInheritance 为 true ,那么它就会响应了。

if(sticky)
if (eventInheritance) {
    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
    for (Map.Entry<Class<?>, Object> entry : entries) {
        Class<?> candidateEventType = entry.getKey();
        //如果eventtype是candidateEventType同一个类或是其子类
        if (eventType.isAssignableFrom(candidateEventType)) {
            Object stickyEvent = entry.getValue();
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
} else {
    Object stickyEvent = stickyEvents.get(eventType);
    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}

最后是调用checkPostStickyEventToSubscription()做一次安全判断,就调用postToSubscription()发送事件了。
这里就关联到了我们之前讲的Poster类的作用了。
回答之前的问题:Poster只负责粘滞事件的代码。这里可以回答一部分:如果不是 sticky 事件都直接不执行了,还怎么响应。

private void postToSubscription(...) {
    switch (threadMode) {
        case PostThread:
            //直接调用响应方法
            invokeSubscriber(subscription, event);
            break;
        case MainThread:
            //如果是主线程则直接调用响应事件,否则使用handle去在主线程响应事件
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
            //。。。
    }
}

最后,还记得我们之前没有讲的那个invokeSubscriber(subscription, event);方法吗? 之前我们不知道subscriberMethod是什么,现在我们能看懂了,就是通过反射调用订阅者类subscriber的订阅方法onEventXXX(),并将event作为参数传递进去

subscription.subscriberMethod.method.invoke(subscription.subscriber, event);

Register与Poster工作图

原理图

开源实验室:图6

流程图

完整的注册流程
开源实验室:图7

至此,整个EventBus从注册订阅到事件的处理到响应的过程我们都分析完了,最后就只剩下发送流程和取消注册了。

天边的星星