先贴下Subscriber
public abstract class Subscriber<T extends Event> {
/**
* 事件执行的落地
*/
public abstract void onEvent(T event);
/**
* 获取当前订阅者订阅的事件的类型
*/
public abstract Class<? extends Event> subscribeType();
/**
* 其子类可以来决定是同步还是异步
*/
public Executor executor() {
return null;
}
/**
* 是否忽略过期事件。
*/
public boolean ignoreExpireEvent() {
return false;
}
}
SmartSubscriber
Subscriber有一个**SmartSubscriber**
的扩展子类,是用来针对多事件订阅的**List<Class<? extends Event>> subscribeTypes()**
该方法返回的是订阅者所需要订阅的事件列表
----->代码片段1
public abstract class SmartSubscriber extends Subscriber {
/**
* 实现该方法,用来返回多订阅的指定集合
*/
public abstract List<Class<? extends Event>> subscribeTypes();
// ...
}
----->代码片段2
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
}
----->代码片段3
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
// ........
}
这里贴了三段代码
- 代码片段一,就是
**SmartSubscriber**
的抽象**List<Class<? extends Event>> subscribeTypes()**
- 代码片段二,拿某一个该接口的实现举例子,返回了一个
**List**
,里面包含了该订阅者订阅的事件类型集合,那么我就可以这样理解,这个接口的返回值所对应的事件,就是该订阅者订阅的事件,通过代码片段三NotifyCenter#registerSubscriber
注册 - 代码片段三,又回到了NotifyCenter,订阅者注册的时候,首先是先判定了当前订阅者是监听的是否是多事件的,其事件注册的具体细节可以参考NotifyCenter的源码分析
subscribeType
这个接口和上面说的**SmartSubscriber**
的**subscribeTypes()**
接口没有多大的区别,只是这个返回的是一个事件,上面的返回的是多个事件
@Override
public Class<? extends Event> subscribeType() {
return Xxxxxxxx.class;
}
executor
**executor()**
上一篇的EventPublisher在选择通过异步方式还是同步方式的时候就是调用的这里来判定的,我们可以看到这个的默认实现是null
@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
final Runnable job = () -> subscriber.onEvent(event);
// 判断当前事件是异步还是同步
final Executor executor = subscriber.executor();
if (executor != null) {
// 异步
executor.execute(job);
} else {
try {
// 同步
job.run();
} catch (Throwable e) {
// ..
}
}
}
- 所以订阅者如果没有实际的实现executor这个接口,那么事件发布者就是走的同步方式,这里简单的回顾了一下上一篇的事件发布者
ignoreExpireEvent
这个接口的话,可以看下上游的调用处
@Override
public void receiveEvent(Event event) {
// ..
for (Subscriber subscriber : subscribers) {
// 是否忽略过期事件
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
continue;
}
// Notify single subscriber for slow event.
notifySubscriber(subscriber, event);
}
}
- 这段代码是事件发布者里的,是在准备发布广播给订阅者(notifySubscriber)之前,判断的是否需要忽略过期的事件的
- 订阅者们可以自己去判断是否订阅过期事件,当然这个接口给的默认实现是
**false**
onEvent
最重要的接口最后说
其实也没啥好说的,就随便拿一个实现lou一眼就行
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 处理客户端断开
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
// 处理客户端操作
handleClientOperation((ClientOperationEvent) event);
}
}
}
- 这里拿的
**ClientServiceIndexesManager**
这个类的**onEvent**
方法举例子,上面也贴出过这个类了(SmartSubscriber接口的描述) 这里接收的Event参数通过判断其实际的类型,来走这个事件所对应的不同的逻辑 - Event是事件的抽象类,每个不同的事件都往其自身塞入了其事件所携带的信息,这个抽象类在下一篇描述