先上一些个人阅读的时候觉得的核心代码,至于NotifyCenter已经在上文当中简要阐述了一下
当然,以下均为个人的理解哈~!
先看看具体属性
// Notify状态private static final AtomicBoolean CLOSED = new AtomicBoolean(false);// 单例申明private static final NotifyCenter INSTANCE = new NotifyCenter();// 上文讲过,存储事件的发布者们private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
目前只放了这三个属性,可以看出来几个点:
- 通过JUC的原子Boolan来判定是否启用
- NotifyCenter 是单例的
- publisherMap 来保存对应事件的发布者
它的静态代码块
static {// ignore ..INSTANCE.sharePublisher = new DefaultSharePublisher();ThreadUtils.addShutdownHook(NotifyCenter::shutdown);}
debug的时候主要就这两步,其余的暂时忽略了
- 设置了默认的事件发布者
- 添加了一个shutdown的钩子,也就是shutdown的时候NotifyCenter的停用
接着看看他的核心方法
shutdown
终止NotifyCenter
/*** 调用所有的事件发布者的shutdown ---》Closeable 该接口定义的*/public static void shutdown() {if (!CLOSED.compareAndSet(false, true)) {return;}for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {try {EventPublisher eventPublisher = entry.getValue();eventPublisher.shutdown();} catch (Throwable e) {// ..}}try {INSTANCE.sharePublisher.shutdown();} catch (Throwable e) {// ..}}
- 通过CAS来校验NotifyCenter的状态,保证并发安全
- 调用每个 EventPublisher 的 shutdown() 方法 ,该方法实现自
**Closeable**接口,该方法的具体实现细节会在对 EventPublisher 的源码分析篇细化
所以 对于shutdown方法而言,主要是调用了每个发布者的shutdown方法
publishEvent
发布事件通知
/*** 发布事件*/private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}// com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent 本地数据变动// com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientRegisterServiceEvent 这个事件应该是服务注册的事件// com.alibaba.nacos.naming.core.v2.event.client.ClientEvent.ClientChangedEvent 客户端修改的相关事件// com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientSubscribeServiceEvent 这个事件好像每几秒都会被调用final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}return false;}
- 如果事件是SlowEvent,则使用默认的事件发布者进行事件发布
- 拿到全限定名,从publishMap中拿到当前事件的事件发布者,该方法的具体实现细节会在EventPublisher 的源码分析篇细化
- 通过事件发布者的publish方法发布事件
addSubscriber
添加事件订阅者的同时注册事件发布者
/*** 向publishmap添加事件发布者,同时添加订阅者*/private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,EventPublisherFactory factory) {final String topic = ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.// 向map里面添加了事件的发布者MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {// 将事件订阅者添加进刚刚的事件发布者里面publisher.addSubscriber(consumer);}}
- 主要的入参是一个订阅者,以及事件发布工厂(通过它来创建事件发布者,因为可能事件添加到订阅者的时候,该订阅者尚未初始化,即publish的value还未有值)
- 加synchronized锁,不过本身publisherMap已经是ConcurrentHashMap,不太明白为什么这里需要加锁?注释上写的是
computeIfAbsent该方法不安全,可能是为了保证并发状态下同时读取,存在读到为null,导致publish同时创建,所以加上的锁 computeIfAbsent通过EventPublisherFactory(实现了函数式接口BiFunction的apply方法)以及传入的订阅者的类型,来构造一个事件的发布者,然后丢到publishMap里面,具体apply的代码可以看下面贴出来的,@Overridepublic EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {// Like ClientEvent$ClientChangeEvent cache by ClientEventClass<? extends Event> cachedEventType =eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;publisher.computeIfAbsent(cachedEventType, eventClass -> {NamingEventPublisher result = new NamingEventPublisher();result.init(eventClass, maxQueueSize);return result;});return publisher.get(cachedEventType);}
接着之前的代码向publishmap添加了事件发布者之后,
publisher.addSubscriber(consumer)接着将事件订阅者添加给事件发布者
总的来说,就是添加事件订阅者的同时,如果publishmap没有事件的发布者,则向通过工厂创建一个事件发布者,并且给这个事件发布者添加事件,利用发布者来进行广播
registerToPublisher
向事件订阅者注册事件
/*** 直接向事件发布者注册订阅者** @param eventType class Instances type of the event type.* @param factory publisher factory.* @param queueMaxSize the publisher's queue max size.*/public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);}}
- 其实阅读过上面的addSubscriber方法之后,就能够很好的理解这一块的内容了,事件发布者注册事件,同理
deregisterPublisher
有添加自然有删除
/*** Deregister publisher.** @param eventType class Instances type of the event type.*/public static void deregisterPublisher(final Class<? extends Event> eventType) {final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.remove(topic);try {publisher.shutdown();} catch (Throwable ex) {// ignore ..}}
- 逻辑很清晰,先从publishmap移除掉事件的发布者
- 再调用事件发布者的shutdown方法,当然,事件发布者publish的源码分析会在下一章节阐述
至此,NotifyCenter的源码剖析到此差不多了
