先上一些个人阅读的时候觉得的核心代码,至于NotifyCenter已经在上文当中简要阐述了一下
当然,以下均为个人的理解哈~!

先看看具体属性

  1. // Notify状态
  2. private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
  3. // 单例申明
  4. private static final NotifyCenter INSTANCE = new NotifyCenter();
  5. // 上文讲过,存储事件的发布者们
  6. private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);

目前只放了这三个属性,可以看出来几个点:

  • 通过JUC的原子Boolan来判定是否启用
  • NotifyCenter 单例
  • publisherMap 来保存对应事件的发布者

它的静态代码块

  1. static {
  2. // ignore ..
  3. INSTANCE.sharePublisher = new DefaultSharePublisher();
  4. ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
  5. }

debug的时候主要就这两步,其余的暂时忽略

  • 设置了默认的事件发布者
  • 添加了一个shutdown的钩子,也就是shutdown的时候NotifyCenter的停用

接着看看他的核心方法

shutdown

终止NotifyCenter

  1. /**
  2. * 调用所有的事件发布者的shutdown ---》Closeable 该接口定义的
  3. */
  4. public static void shutdown() {
  5. if (!CLOSED.compareAndSet(false, true)) {
  6. return;
  7. }
  8. for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {
  9. try {
  10. EventPublisher eventPublisher = entry.getValue();
  11. eventPublisher.shutdown();
  12. } catch (Throwable e) {
  13. // ..
  14. }
  15. }
  16. try {
  17. INSTANCE.sharePublisher.shutdown();
  18. } catch (Throwable e) {
  19. // ..
  20. }
  21. }
  • 通过CAS来校验NotifyCenter的状态保证并发安全
  • 调用每个 EventPublisher shutdown() 方法 ,该方法实现自 **Closeable**接口,该方法的具体实现细节会在对 EventPublisher 的源码分析篇细化

所以 对于shutdown方法而言,主要是调用了每个发布者的shutdown方法

publishEvent

发布事件通知

  1. /**
  2. * 发布事件
  3. */
  4. private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
  5. if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
  6. return INSTANCE.sharePublisher.publish(event);
  7. }
  8. // com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent 本地数据变动
  9. // com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientRegisterServiceEvent 这个事件应该是服务注册的事件
  10. // com.alibaba.nacos.naming.core.v2.event.client.ClientEvent.ClientChangedEvent 客户端修改的相关事件
  11. // com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientSubscribeServiceEvent 这个事件好像每几秒都会被调用
  12. final String topic = ClassUtils.getCanonicalName(eventType);
  13. EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  14. if (publisher != null) {
  15. return publisher.publish(event);
  16. }
  17. return false;
  18. }
  • 如果事件是SlowEvent,则使用默认的事件发布者进行事件发布
  • 拿到全限定名从publishMap中拿到当前事件的事件发布者,该方法的具体实现细节会在EventPublisher 的源码分析篇细化
  • 通过事件发布者的publish方法发布事件

addSubscriber

添加事件订阅者的同时注册事件发布者

  1. /**
  2. * 向publishmap添加事件发布者,同时添加订阅者
  3. */
  4. private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
  5. EventPublisherFactory factory) {
  6. final String topic = ClassUtils.getCanonicalName(subscribeType);
  7. synchronized (NotifyCenter.class) {
  8. // MapUtils.computeIfAbsent is a unsafe method.
  9. // 向map里面添加了事件的发布者
  10. MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
  11. }
  12. EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  13. if (publisher instanceof ShardedEventPublisher) {
  14. ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  15. } else {
  16. // 将事件订阅者添加进刚刚的事件发布者里面
  17. publisher.addSubscriber(consumer);
  18. }
  19. }
  • 主要的入参是一个订阅者,以及事件发布工厂(通过它来创建事件发布者,因为可能事件添加到订阅者的时候,该订阅者尚未初始化,即publish的value还未有值)
  • synchronized锁,不过本身publisherMap已经是ConcurrentHashMap,不太明白为什么这里需要加锁?注释上写的是computeIfAbsent该方法不安全,可能是为了保证并发状态下同时读取,存在读到为null,导致publish同时创建,所以加上的锁
  • computeIfAbsent通过EventPublisherFactory(实现了函数式接口BiFunction的apply方法)以及传入的订阅者的类型,来构造一个事件的发布者,然后丢到publishMap里面,具体apply的代码可以看下面贴出来的,

    1. @Override
    2. public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
    3. // Like ClientEvent$ClientChangeEvent cache by ClientEvent
    4. Class<? extends Event> cachedEventType =
    5. eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
    6. publisher.computeIfAbsent(cachedEventType, eventClass -> {
    7. NamingEventPublisher result = new NamingEventPublisher();
    8. result.init(eventClass, maxQueueSize);
    9. return result;
    10. });
    11. return publisher.get(cachedEventType);
    12. }
  • 接着之前的代码向publishmap添加了事件发布者之后,publisher.addSubscriber(consumer)接着将事件订阅者添加给事件发布者

总的来说,就是添加事件订阅者的同时,如果publishmap没有事件的发布者,则向通过工厂创建一个事件发布者,并且给这个事件发布者添加事件,利用发布者来进行广播

registerToPublisher

向事件订阅者注册事件

  1. /**
  2. * 直接向事件发布者注册订阅者
  3. *
  4. * @param eventType class Instances type of the event type.
  5. * @param factory publisher factory.
  6. * @param queueMaxSize the publisher's queue max size.
  7. */
  8. public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
  9. final EventPublisherFactory factory, final int queueMaxSize) {
  10. if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
  11. return INSTANCE.sharePublisher;
  12. }
  13. final String topic = ClassUtils.getCanonicalName(eventType);
  14. synchronized (NotifyCenter.class) {
  15. // MapUtils.computeIfAbsent is a unsafe method.
  16. MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
  17. }
  18. return INSTANCE.publisherMap.get(topic);
  19. }
  20. }
  • 其实阅读过上面的addSubscriber方法之后,就能够很好的理解这一块的内容了,事件发布者注册事件,同理

deregisterPublisher

有添加自然有删除

  1. /**
  2. * Deregister publisher.
  3. *
  4. * @param eventType class Instances type of the event type.
  5. */
  6. public static void deregisterPublisher(final Class<? extends Event> eventType) {
  7. final String topic = ClassUtils.getCanonicalName(eventType);
  8. EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
  9. try {
  10. publisher.shutdown();
  11. } catch (Throwable ex) {
  12. // ignore ..
  13. }
  14. }
  • 逻辑很清晰,先从publishmap移除掉事件的发布者
  • 再调用事件发布者的shutdown方法,当然,事件发布者publish的源码分析会在下一章节阐述

至此,NotifyCenter的源码剖析到此差不多了