这是事件通知里处理事件的落脚点,也就是事件执行的落地

先贴下Subscriber

  1. public abstract class Subscriber<T extends Event> {
  2. /**
  3. * 事件执行的落地
  4. */
  5. public abstract void onEvent(T event);
  6. /**
  7. * 获取当前订阅者订阅的事件的类型
  8. */
  9. public abstract Class<? extends Event> subscribeType();
  10. /**
  11. * 其子类可以来决定是同步还是异步
  12. */
  13. public Executor executor() {
  14. return null;
  15. }
  16. /**
  17. * 是否忽略过期事件。
  18. */
  19. public boolean ignoreExpireEvent() {
  20. return false;
  21. }
  22. }

从接口描述上,就能清晰的了解到这个接口的主流程

SmartSubscriber

Subscriber有一个**SmartSubscriber**扩展子类,是用来针对多事件订阅的**List<Class<? extends Event>> subscribeTypes()** 该方法返回的是订阅者所需要订阅的事件列表

  1. ----->代码片段1
  2. public abstract class SmartSubscriber extends Subscriber {
  3. /**
  4. * 实现该方法,用来返回多订阅的指定集合
  5. */
  6. public abstract List<Class<? extends Event>> subscribeTypes();
  7. // ...
  8. }
  9. ----->代码片段2
  10. @Component
  11. public class ClientServiceIndexesManager extends SmartSubscriber {
  12. @Override
  13. public List<Class<? extends Event>> subscribeTypes() {
  14. List<Class<? extends Event>> result = new LinkedList<>();
  15. result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
  16. result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
  17. result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
  18. result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
  19. result.add(ClientEvent.ClientDisconnectEvent.class);
  20. return result;
  21. }
  22. }
  23. ----->代码片段3
  24. public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
  25. if (consumer instanceof SmartSubscriber) {
  26. for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
  27. if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
  28. INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
  29. } else {
  30. // For case, producer: defaultPublisher -> consumer: subscriber.
  31. addSubscriber(consumer, subscribeType, factory);
  32. }
  33. }
  34. return;
  35. }
  36. // ........
  37. }

这里贴了三段代码

  • 代码片段一,就是**SmartSubscriber**的抽象**List<Class<? extends Event>> subscribeTypes()**
  • 代码片段二,拿某一个该接口的实现举例子,返回了一个**List**,里面包含了该订阅者订阅的事件类型集合,那么我就可以这样理解,这个接口的返回值所对应的事件,就是该订阅者订阅的事件,通过代码片段三NotifyCenter#registerSubscriber注册
  • 代码片段三,又回到了NotifyCenter,订阅者注册的时候,首先是先判定了当前订阅者是监听的是否是多事件的,其事件注册的具体细节可以参考NotifyCenter的源码分析

subscribeType

这个接口和上面说的**SmartSubscriber****subscribeTypes()**接口没有多大的区别,只是这个返回的是一个事件,上面的返回的是多个事件

  1. @Override
  2. public Class<? extends Event> subscribeType() {
  3. return Xxxxxxxx.class;
  4. }

executor

**executor()**上一篇的EventPublisher在选择通过异步方式还是同步方式的时候就是调用的这里来判定的,我们可以看到这个的默认实现是null

  1. @Override
  2. public void notifySubscriber(Subscriber subscriber, Event event) {
  3. final Runnable job = () -> subscriber.onEvent(event);
  4. // 判断当前事件是异步还是同步
  5. final Executor executor = subscriber.executor();
  6. if (executor != null) {
  7. // 异步
  8. executor.execute(job);
  9. } else {
  10. try {
  11. // 同步
  12. job.run();
  13. } catch (Throwable e) {
  14. // ..
  15. }
  16. }
  17. }
  • 所以订阅者如果没有实际的实现executor这个接口,那么事件发布者就是走的同步方式,这里简单的回顾了一下上一篇的事件发布者

ignoreExpireEvent

这个接口的话,可以看下上游的调用处

  1. @Override
  2. public void receiveEvent(Event event) {
  3. // ..
  4. for (Subscriber subscriber : subscribers) {
  5. // 是否忽略过期事件
  6. if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
  7. continue;
  8. }
  9. // Notify single subscriber for slow event.
  10. notifySubscriber(subscriber, event);
  11. }
  12. }
  • 这段代码是事件发布者里的,是在准备发布广播给订阅者(notifySubscriber)之前,判断的是否需要忽略过期的事件的
  • 订阅者们可以自己去判断是否订阅过期事件,当然这个接口给的默认实现是**false**


onEvent

最重要的接口最后说
其实也没啥好说的,就随便拿一个实现lou一眼就行

  1. @Component
  2. public class ClientServiceIndexesManager extends SmartSubscriber {
  3. @Override
  4. public void onEvent(Event event) {
  5. if (event instanceof ClientEvent.ClientDisconnectEvent) {
  6. // 处理客户端断开
  7. handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
  8. } else if (event instanceof ClientOperationEvent) {
  9. // 处理客户端操作
  10. handleClientOperation((ClientOperationEvent) event);
  11. }
  12. }
  13. }
  • 这里拿的**ClientServiceIndexesManager**这个类的**onEvent**方法举例子,上面也贴出过这个类了(SmartSubscriber接口的描述) 这里接收的Event参数通过判断其实际的类型,来走这个事件所对应的不同的逻辑
  • Event事件的抽象类每个不同的事件都往其自身塞入了其事件所携带的信息,这个抽象类在下一篇描述