先上一些个人阅读的时候觉得的核心代码,至于NotifyCenter已经在上文当中简要阐述了一下
当然,以下均为个人的理解哈~!
先看看具体属性
// Notify状态
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
// 单例申明
private static final NotifyCenter INSTANCE = new NotifyCenter();
// 上文讲过,存储事件的发布者们
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
目前只放了这三个属性,可以看出来几个点:
- 通过JUC的原子Boolan来判定是否启用
- NotifyCenter 是单例的
- publisherMap 来保存对应事件的发布者
它的静态代码块
static {
// ignore ..
INSTANCE.sharePublisher = new DefaultSharePublisher();
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
debug的时候主要就这两步,其余的暂时忽略了
- 设置了默认的事件发布者
- 添加了一个shutdown的钩子,也就是shutdown的时候NotifyCenter的停用
接着看看他的核心方法
shutdown
终止NotifyCenter
/**
* 调用所有的事件发布者的shutdown ---》Closeable 该接口定义的
*/
public static void shutdown() {
if (!CLOSED.compareAndSet(false, true)) {
return;
}
for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {
try {
EventPublisher eventPublisher = entry.getValue();
eventPublisher.shutdown();
} catch (Throwable e) {
// ..
}
}
try {
INSTANCE.sharePublisher.shutdown();
} catch (Throwable e) {
// ..
}
}
- 通过CAS来校验NotifyCenter的状态,保证并发安全
- 调用每个 EventPublisher 的 shutdown() 方法 ,该方法实现自
**Closeable**
接口,该方法的具体实现细节会在对 EventPublisher 的源码分析篇细化
所以 对于shutdown方法而言,主要是调用了每个发布者的shutdown方法
publishEvent
发布事件通知
/**
* 发布事件
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
// com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent 本地数据变动
// com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientRegisterServiceEvent 这个事件应该是服务注册的事件
// com.alibaba.nacos.naming.core.v2.event.client.ClientEvent.ClientChangedEvent 客户端修改的相关事件
// com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientSubscribeServiceEvent 这个事件好像每几秒都会被调用
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
return false;
}
- 如果事件是SlowEvent,则使用默认的事件发布者进行事件发布
- 拿到全限定名,从publishMap中拿到当前事件的事件发布者,该方法的具体实现细节会在EventPublisher 的源码分析篇细化
- 通过事件发布者的publish方法发布事件
addSubscriber
添加事件订阅者的同时注册事件发布者
/**
* 向publishmap添加事件发布者,同时添加订阅者
*/
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
// 向map里面添加了事件的发布者
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
// 将事件订阅者添加进刚刚的事件发布者里面
publisher.addSubscriber(consumer);
}
}
- 主要的入参是一个订阅者,以及事件发布工厂(通过它来创建事件发布者,因为可能事件添加到订阅者的时候,该订阅者尚未初始化,即publish的value还未有值)
- 加synchronized锁,不过本身publisherMap已经是ConcurrentHashMap,不太明白为什么这里需要加锁?注释上写的是
computeIfAbsent
该方法不安全,可能是为了保证并发状态下同时读取,存在读到为null,导致publish同时创建,所以加上的锁 computeIfAbsent
通过EventPublisherFactory(实现了函数式接口BiFunction的apply方法)以及传入的订阅者的类型,来构造一个事件的发布者,然后丢到publishMap里面,具体apply的代码可以看下面贴出来的,@Override
public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
// Like ClientEvent$ClientChangeEvent cache by ClientEvent
Class<? extends Event> cachedEventType =
eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
publisher.computeIfAbsent(cachedEventType, eventClass -> {
NamingEventPublisher result = new NamingEventPublisher();
result.init(eventClass, maxQueueSize);
return result;
});
return publisher.get(cachedEventType);
}
接着之前的代码向publishmap添加了事件发布者之后,
publisher.addSubscriber(consumer)
接着将事件订阅者添加给事件发布者
总的来说,就是添加事件订阅者的同时,如果publishmap没有事件的发布者,则向通过工厂创建一个事件发布者,并且给这个事件发布者添加事件,利用发布者来进行广播
registerToPublisher
向事件订阅者注册事件
/**
* 直接向事件发布者注册订阅者
*
* @param eventType class Instances type of the event type.
* @param factory publisher factory.
* @param queueMaxSize the publisher's queue max size.
*/
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
}
- 其实阅读过上面的addSubscriber方法之后,就能够很好的理解这一块的内容了,事件发布者注册事件,同理
deregisterPublisher
有添加自然有删除
/**
* Deregister publisher.
*
* @param eventType class Instances type of the event type.
*/
public static void deregisterPublisher(final Class<? extends Event> eventType) {
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
try {
publisher.shutdown();
} catch (Throwable ex) {
// ignore ..
}
}
- 逻辑很清晰,先从publishmap移除掉事件的发布者
- 再调用事件发布者的shutdown方法,当然,事件发布者publish的源码分析会在下一章节阐述
至此,NotifyCenter的源码剖析到此差不多了