先贴下Subscriber
public abstract class Subscriber<T extends Event> {/*** 事件执行的落地*/public abstract void onEvent(T event);/*** 获取当前订阅者订阅的事件的类型*/public abstract Class<? extends Event> subscribeType();/*** 其子类可以来决定是同步还是异步*/public Executor executor() {return null;}/*** 是否忽略过期事件。*/public boolean ignoreExpireEvent() {return false;}}
SmartSubscriber
Subscriber有一个**SmartSubscriber**的扩展子类,是用来针对多事件订阅的**List<Class<? extends Event>> subscribeTypes()** 该方法返回的是订阅者所需要订阅的事件列表
----->代码片段1public abstract class SmartSubscriber extends Subscriber {/*** 实现该方法,用来返回多订阅的指定集合*/public abstract List<Class<? extends Event>> subscribeTypes();// ...}----->代码片段2@Componentpublic class ClientServiceIndexesManager extends SmartSubscriber {@Overridepublic List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);return result;}}----->代码片段3public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {// For case, producer: defaultPublisher -> consumer: subscriber.addSubscriber(consumer, subscribeType, factory);}}return;}// ........}
这里贴了三段代码
- 代码片段一,就是
**SmartSubscriber**的抽象**List<Class<? extends Event>> subscribeTypes()** - 代码片段二,拿某一个该接口的实现举例子,返回了一个
**List**,里面包含了该订阅者订阅的事件类型集合,那么我就可以这样理解,这个接口的返回值所对应的事件,就是该订阅者订阅的事件,通过代码片段三NotifyCenter#registerSubscriber注册 - 代码片段三,又回到了NotifyCenter,订阅者注册的时候,首先是先判定了当前订阅者是监听的是否是多事件的,其事件注册的具体细节可以参考NotifyCenter的源码分析
subscribeType
这个接口和上面说的**SmartSubscriber**的**subscribeTypes()**接口没有多大的区别,只是这个返回的是一个事件,上面的返回的是多个事件
@Overridepublic Class<? extends Event> subscribeType() {return Xxxxxxxx.class;}
executor
**executor()**上一篇的EventPublisher在选择通过异步方式还是同步方式的时候就是调用的这里来判定的,我们可以看到这个的默认实现是null
@Overridepublic void notifySubscriber(Subscriber subscriber, Event event) {final Runnable job = () -> subscriber.onEvent(event);// 判断当前事件是异步还是同步final Executor executor = subscriber.executor();if (executor != null) {// 异步executor.execute(job);} else {try {// 同步job.run();} catch (Throwable e) {// ..}}}
- 所以订阅者如果没有实际的实现executor这个接口,那么事件发布者就是走的同步方式,这里简单的回顾了一下上一篇的事件发布者
ignoreExpireEvent
这个接口的话,可以看下上游的调用处
@Overridepublic void receiveEvent(Event event) {// ..for (Subscriber subscriber : subscribers) {// 是否忽略过期事件if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {continue;}// Notify single subscriber for slow event.notifySubscriber(subscriber, event);}}
- 这段代码是事件发布者里的,是在准备发布广播给订阅者(notifySubscriber)之前,判断的是否需要忽略过期的事件的
- 订阅者们可以自己去判断是否订阅过期事件,当然这个接口给的默认实现是
**false**
onEvent
最重要的接口最后说
其实也没啥好说的,就随便拿一个实现lou一眼就行
@Componentpublic class ClientServiceIndexesManager extends SmartSubscriber {@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {// 处理客户端断开handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {// 处理客户端操作handleClientOperation((ClientOperationEvent) event);}}}
- 这里拿的
**ClientServiceIndexesManager**这个类的**onEvent**方法举例子,上面也贴出过这个类了(SmartSubscriber接口的描述) 这里接收的Event参数通过判断其实际的类型,来走这个事件所对应的不同的逻辑 - Event是事件的抽象类,每个不同的事件都往其自身塞入了其事件所携带的信息,这个抽象类在下一篇描述
