承上启下,从NotifyCenter**publisher.publish**_**(**_**event**_**)**_开始,进入事件发布者的源码分析
当然,以下均为个人的理解哈~!

先看看接口

  1. public interface EventPublisher extends Closeable {
  2. /**
  3. * 初始化事件发布者/Initializes the event publisher.
  4. */
  5. void init(Class<? extends Event> type, int bufferSize);
  6. /**
  7. * 事件队列长度/The number of currently staged events.
  8. */
  9. long currentEventSize();
  10. /**
  11. * 添加订阅者/Add listener.
  12. */
  13. void addSubscriber(Subscriber subscriber);
  14. /**
  15. * 移除订阅者/Remove listener.
  16. */
  17. void removeSubscriber(Subscriber subscriber);
  18. /**
  19. * 发布事件/publish event.
  20. */
  21. boolean publish(Event event);
  22. /**
  23. * 通知订阅者/Notify listener.
  24. */
  25. void notifySubscriber(Subscriber subscriber, Event event);
  26. }
  • 继承自Closeable,事件发布者通过实现shutdown方法,来停止事件的发布(NotifyCenter进行shutdown的时候会进行调用)
  • 接着看看这个接口抽象了哪些行为,就能有个大致的理解


接着细化其源码

先看看其具体有哪些属性吧

  1. public class NamingEventPublisher extends Thread implements ShardedEventPublisher {
  2. private static final String THREAD_NAME = "naming.publisher-";
  3. private static final int DEFAULT_WAIT_TIME = 60;
  4. private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap<>();
  5. private volatile boolean initialized = false;
  6. private volatile boolean shutdown = false;
  7. private int queueMaxSize = -1;
  8. private BlockingQueue<Event> queue;
  9. private String publisherName;
  10. }
  • 继承了**Thread**接口,那我可以理解成事件发布者本质是一个线程
  • **Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes**是一个ConcurrentHashMap,其key是一个class,也就是说是一类事件为key,其value是一个订阅者集合,Subscriber接口会在下一篇中分析,这里就当做事件的订阅者就行,所以这个map就是事件发布者的一个核心,在这里,保存了事件的订阅者的集合
  • **queue**是一个阻塞队列,泛型是一个事件,可以暂时理解成消息暂存队列(like local mq),下文会具体阐述

init

我们先看看init方法是在哪里调用的,init顾名思义就是初始化,咱从源头开始

  1. @Override
  2. public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
  3. Class<? extends Event> cachedEventType =
  4. eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
  5. publisher.computeIfAbsent(cachedEventType, eventClass -> {
  6. NamingEventPublisher result = new NamingEventPublisher();
  7. // here
  8. result.init(eventClass, maxQueueSize);
  9. return result;
  10. });
  11. return publisher.get(cachedEventType);
  12. }
  • 眼熟的话(因为这块代码在NotifyCenter源码分析中有贴过),这块代码是在publishmap添加订阅者的时候(如果事件发布者未初始化),就会调用EventPublisherFactoryapply实现,创建事件发布者,此时就会执行它的init()方法

接下来看看init的源码

  1. @Override
  2. public void init(Class<? extends Event> type, int bufferSize) {
  3. this.queueMaxSize = bufferSize;
  4. this.queue = new ArrayBlockingQueue<>(bufferSize);
  5. this.publisherName = type.getSimpleName();
  6. super.setName(THREAD_NAME + this.publisherName);
  7. super.setDaemon(true);
  8. super.start();
  9. initialized = true;
  10. }
  • 其实没啥好看的,初始化了一下阻塞队列**queue**的大小,由于其本质又是一个Thread,于是在初始化的时候调用了其start方法,并给当前线程命了个名,由此,事件发布者在其执行init方法的时候,就开始了它的线程之旅

接着,看看其启动事件发布者后都做了些什么?

启动事件发布者后又做了啥

  1. private void handleEvents() {
  2. while (!shutdown) {
  3. try {
  4. // 取出事件
  5. final Event event = queue.take();
  6. // 事件处理
  7. handleEvent(event);
  8. } catch (InterruptedException e) {
  9. // ..
  10. }
  11. }
  12. }
  13. private void handleEvent(Event event) {
  14. Class<? extends Event> eventType = event.getClass();
  15. Set<Subscriber<? extends Event>> subscribers = subscribes.get(eventType);
  16. if (null == subscribers) {
  17. // ..
  18. }
  19. // 给每个订阅者发送通知
  20. for (Subscriber subscriber : subscribers) {
  21. notifySubscriber(subscriber, event);
  22. }
  23. }
  • 在其run方法里面,执行的核心方法是handleEvents,于是直接跳到这里来,这里通过一个while循环来保持发布者这个线程的持续在线,终止条件是其shutdown状态
  • 通过阻塞队列获取事件,然后再发布事件,这样当队列中无事件的时候,线程就会阻塞在这里,不会造成资源的浪费
  • 接着开始**handleEvent**,也就是处理我们的事件,逻辑很清晰明了,拿到当前事件的Clz,然后从subscribes(保存订阅者的map)中获取到订阅者的集合,接着一个for循环给每个订阅者们发送通知,**notifySubscriber**的具体实现会在后面贴出

所以,在事件发布者初始化的时候,就被当做一个线程,然后放在了NotifyCenter的publishmap里面,并且直接调用了其start方法启动了它,然后其内部保存了一个阻塞队列,用来暂存事件,当监听到里面的阻塞队列来了事件之后,就会消费它(挺像MQ的)

向发布者添加事件

上面描述了事件的保存位置,这里贴一下事件是怎么被添加订阅者的

  1. @Override
  2. public boolean publish(Event event) {
  3. checkIsStart();
  4. // 将事件放到阻塞队列
  5. boolean success = this.queue.offer(event);
  6. if (!success) {
  7. handleEvent(event);
  8. return true;
  9. }
  10. return true;
  11. }
  • 逻辑简单,就是往阻塞队列queue塞入了事件,如果成功就过,不成功的话打了个日志就直接处理这个事件,而不是放入阻塞队列了
  • 如果不记得事件添加的源头的话,这里贴一下代码**NotifyCenter#publishEvent{publisher.publish**_**(**_**event**_**)**_**}**

notifySubscriber

就是在这里,进行了事件的通知,也是NotifyCenter调用publish(event)方法的落地

  1. @Override
  2. public void notifySubscriber(Subscriber subscriber, Event event) {
  3. // ..
  4. // 执行订阅者们的onEvent方法,即每个订阅者对于该事件的处理逻辑
  5. final Runnable job = () -> subscriber.onEvent(event);
  6. // 判断当前事件是异步还是同步
  7. final Executor executor = subscriber.executor();
  8. if (executor != null) {
  9. // 异步
  10. executor.execute(job);
  11. } else {
  12. try {
  13. // 同步
  14. job.run();
  15. } catch (Throwable e) {
  16. // ..
  17. }
  18. }
  19. }
  • 这里的逻辑很清晰通过订阅者的具体实现,来决定这个消息通知是走的同步还是异步的方式
  • 提交的任务就是调用传入的订阅者onEvent方法的具体实现
  • executor.execute(job)如果订阅者有对 Executor的实现,那么就是走的异步
  • job.run()如果订阅者没实现Executor,那么就直接执行run,其本质就是执行了subscriber.onEvent(event)这行代码,并且是同步执行的

如果对这块线程操作理解模糊的话,下面打印一下这块位置的线程ID:

  1. @Override
  2. public void notifySubscriber(Subscriber subscriber, Event event) {
  3. System.out.println("notifySubscriber的ThreadId-1:" + + Thread.currentThread().getId());
  4. final Runnable job = () -> {
  5. System.out.println("notifySubscriber的ThreadId-2:" + + Thread.currentThread().getId());
  6. subscriber.onEvent(event);
  7. };
  8. final Executor executor = subscriber.executor();
  9. if (executor != null) {
  10. executor.execute(job);
  11. } else {
  12. try {
  13. job.run();
  14. } catch (Throwable e) {
  15. }
  16. }
  17. }
  18. ======================================================================================================================
  19. 日志贴在这里了,是两个事件,不过是同一类
  20. >>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@5dbbd6e
  21. notifySubscriberThreadId-1264
  22. notifySubscriberThreadId-2264
  23. >>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@13b131b1
  24. notifySubscriberThreadId-1264
  25. notifySubscriberThreadId-2264

如果你本地拉了代码能够调试的话,你就会发现:
这里就拿两次相同的事件来举例子(上面的日志可以看出,ClientOperationEvent@5dbbd6e/@13b131b1,是两个事件,但是事件类型是一样的~notifySubscriber的ThreadId-1的线程ID是一直是同一个,并且notifySubscriber的ThreadId-2的线程ID和notifySubscriber的ThreadId-1是相同的,这就说明这里是同步的,至于为什么大牛写这段代码的时候,同步操作依然使用Runnable来包装的话,可能是为了兼容Executor的写法吧

  • 当然这段代码侧面说明了 EventPublisher 是一个线程
  • 订阅者监听事件可以同步,也可以异步

这里的描述有点绕,不过尽量理解,毕竟这是事件发布的落地

简单来说:可以理解为每个事件的通知均是通过调用订阅者的onEvent方法,类似于一个listener的机制,当然订阅者也可以自己选择通知方式是同步的还是异步的

到此为止,一个事件从NotifyCenter发布事件,事件发布者接收事件,其内部对事件如何通知其订阅者的链路已经剖析完,下文会对订阅者接口Subscriber进行剖析~