Java

前言

什么是观察者模式,如何应用到工作实践中,以及如何抽取一个观察者模板。

  1. 观察者模式定义
  2. 观察者模式的应用场景
  3. 如何实现一个简单的观察者模式
  4. 工作中,如何使用观察者模式的
  5. Spring观察者模式原理
  6. 基于Spring观察者模式,抽取一个通用模板
  7. 唠叨几句,总结一下

    1、观察者模式定义

    观察者模式,也可以称之为发布订阅模式,它在GoF 的《设计模式》中,是这么定义的:

    Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically。

翻译过来就是:观察者模式定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被完成业务的更新
观察者模式属于行为模式,一个对象(被观察者)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。它的主要成员就是观察者和被观察者

  • 被观察者(Observerable):目标对象,状态发生变化时,将通知所有的观察者。
  • 观察者(observer):接受被观察者的状态变化通知,执行预先定义的业务。

    2、观察者模式的应用场景

    哪些场景可以考虑使用观察者模式呢?
    日常生活中,其实就有观察者模式类似的例子。比如,订阅了报社一年的报纸。每天报社印刷好报纸,就送到用户手中。用户就是观察者,报社就是被观察者。
    而日常开发中,观察者模式的使用场景主要表现在:完成一件事情后,通知处理某个逻辑。如,登陆成功发个IM消息支付成功发个邮件消息或者发个抽奖消息用户评论成功给他发个积分等等。
    举个详细点的例子吧,登陆注册应该是最常见的业务场景了,就拿注册来说事,大家经常会遇到类似的场景,就是用户注册成功后,给用户发一条IM消息,又或者发个邮件等等,因此经常有如下的代码:

    1. void register(User user){
    2. insertRegisterUseruser);
    3. sendIMMessage();
    4. sendEmail();
    5. }

    这块代码会有什么问题呢?如果产品又加需求:现在注册成功的用户,再给用户发一条短信通知。于是又得改register方法的代码了。。。这是不是违反了开闭原则了。

    1. void register(User user){
    2. insertRegisterUseruser);
    3. sendIMMessage();
    4. sendMobileMessage();
    5. sendEmail();
    6. }

    并且,如果调发短信的接口失败了,是不是又影响到用户注册了?!这时候,是不是得加个异步方法,异步发通知消息才好??其实这种场景,可以使用异步非阻塞的观察者模式优化的。

    3、如何实现一个简单的观察者模式

    我们先来看下,简单的观察者模式如何实现。可以这么定义

  • 一个主题接口Subject(声明添加、删除、通知观察者方法)

  • 一个Observer观察者接口
  • 一个创建主题的类**ObserverableImpl**(即被观察者),实现了Subject接口
  • 各个观察者的差异化实现

为了通俗易懂,可以这样理解观察者模式:就是被观察者(ObserverableImpl)做了一件事情,或者说发布了一个主题(Subject),然后这件事情通知到各个相关的不同的人(不同的观察者,Observer的差异化实现者)。
后端思维之抽取观察者模板 - 图1
一个主题接口

  1. public interface Subject {
  2. /**
  3. * 添加观察者
  4. * @param observer
  5. */
  6. void addServer(Observer observer);
  7. /**
  8. * 移除观察者
  9. * @param observer
  10. */
  11. void removeServer(Observer observer);
  12. /**
  13. * 通知观察者
  14. * @param msg
  15. */
  16. void notifyAllObservers(String msg);
  17. }

一个Observer接口

  1. /**
  2. * 观察者
  3. *
  4. */
  5. public interface Observer {
  6. /**
  7. * 更新消息
  8. * @param msg
  9. */
  10. void update(String msg);
  11. }

一个创建主题的类ObserverableImpl(即被观察者),同时有观察者列表的属性(其实就是说观察者要事先注册到被观察者)

  1. public class ObserverableImpl implements Subject {
  2. /**
  3. * 存储被观察者
  4. */
  5. private List<Observer> observers = new ArrayList<Observer>();
  6. @Override
  7. public void addServer(Observer observer) {
  8. observers.add(observer);
  9. }
  10. @Override
  11. public void removeServer(Observer observer) {
  12. observers.remove(observer);
  13. }
  14. @Override
  15. public void notifyAllObservers(String msg) {
  16. for (Observer observer : observers) {
  17. observer.update(msg);
  18. }
  19. }
  20. }

观察者的差异化实现,以及使用

  1. public class ObserverOneImpl implements Observer {
  2. @Override
  3. public void update(String msg) {
  4. System.out.println("ObserverOne is notified,"+msg);
  5. }
  6. }
  7. public class ObserverTwoImpl implements Observer {
  8. @Override
  9. public void update(String msg) {
  10. System.out.println("ObserverTwo is notified,"+msg);
  11. }
  12. }
  13. public class ObserverDemoTest {
  14. public static void main(String[] args) {
  15. Subject subject = new ObserverableImpl();
  16. //添加观察者
  17. subject.addObserver(new ObserverOneImpl());
  18. subject.addObserver(new ObserverTwoImpl());
  19. //通知
  20. subject.notifyAllObservers("Hello");
  21. }
  22. }
  23. //输出
  24. ObserverOne is notified,Hello
  25. ObserverTwo is notified,Hello

就这样,我们实现了观察者模式啦,是不是很简单?不过上面的代码,只能算是观察者模式的模板代码,只能反映大体的设计思路。接下来,看下在工作中,是如何使用观察者模式的。

4、工作中,如何使用观察者模式的

观察者模式的实现有两种方式,同步阻塞方式和异步非阻塞方式。第3小节就是一个同步阻塞方式的观察者模式。来看下日常工作的例子:用户注册成功发消息的例子,如何实现。本小节分同步阻塞、异步阻塞、Spring观察者模式三个方向探讨。

  • 同步阻塞方式的观察模式
  • 异步非阻塞方式的观察者模式
  • Spring观察者模式应用

    4.1 同步阻塞方式的观察模式

    可以把用户注册,当做被观察者实现的逻辑,然后发消息就是观察者的实现逻辑
    假设有两个观察者,分 别是发QQ消息和手机消息,于是有以下代码:

    1. public interface RegisterObserver {
    2. void sendMsg(String msg);
    3. }
    4. @Service
    5. public class ObserverMobileImpl implements RegisterObserver {
    6. @Override
    7. public void sendMsg(String msg) {
    8. System.out.println("发送手机短信消息"+msg);
    9. }
    10. }
    11. @Service
    12. public class ObserverQQImpl implements RegisterObserver {
    13. @Override
    14. public void sendMsg(String msg) {
    15. System.out.println("发送QQ消息"+msg);
    16. }
    17. }

    直接可以通过SpringApplicationContextAware,初始化观察者列表,然后用户注册成功,通知观察者即可。代码如下:

    1. @RestController
    2. public class UserController implements ApplicationContextAware{
    3. @Autowired
    4. private UserService userService;
    5. //观察者列表
    6. private Collection<RegisterObserver> regObservers;
    7. @RequestMapping("register")
    8. public String register(UserParam userParam) {
    9. //注册成功过(类似于被观察者,做了某件事)
    10. userService.addUser(userParam);
    11. //然后就开始通知各个观察者。
    12. for(RegisterObserver temp:regObservers){
    13. temp.sendMsg("注冊成功");
    14. }
    15. return "SUCCESS";
    16. }
    17. //利用spring的ApplicationContextAware,初始化所有观察者
    18. @Override
    19. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    20. regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
    21. }
    22. }

    可以发现,观察者模式,就是将不同的行为代码解耦,也就是说将观察者和被观察者代码解耦。但是这里大家会发现,这是同步阻塞式的观察者模式,是有缺点的,比如发QQ消息异常,就会影响用户注册,或者发消息因为某些原因耗时,就影响了用户注册,所以可以考虑异步非阻塞的观察者模式。

    4.2 异步非阻塞方式的观察者模式

    如何实现异步非阻塞,最简单就是另开个线程嘛,即新开个线程或者线程池异步跑观察者通知。代码如下:

    1. @RestController
    2. public class UserController implements ApplicationContextAware{
    3. @Autowired
    4. private UserService userService;
    5. private Collection<RegisterObserver> regObservers;
    6. private Executor executor = Executors.newFixedThreadPool(10);
    7. @RequestMapping("register")
    8. public String register(UserParam userParam) {
    9. userService.addUser(userParam);
    10. //异步通知每个观察者
    11. for (RegisterObserver temp : regObservers) {
    12. executor.execute(() -> {
    13. temp.sendMsg("注冊成功");
    14. });
    15. }
    16. return "SUCCESS";
    17. }
    18. @Override
    19. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    20. regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
    21. }
    22. }

    线程池实现的异步非阻塞方式,还是可以的,但是异步执行逻辑都耦合在了register()函数中,不是很优雅,也增加了这部分业务代码的维护成本。一般日常工作中,会用Spring那一套观察者模式等

    4.3 Spring观察者模式应用

    Spring的观察者模式使用也是比较简单的,就是先定义个事件,继承于ApplicationEvent

    1. public class MessageEvent extends ApplicationEvent {
    2. public MessageEvent(Object source) {
    3. super(source);
    4. }
    5. }

    然后定义一个事件监听器MessageListener,类似于观察者,它实现ApplicationListener接口

    1. @Component
    2. public class MessageListener implements ApplicationListener<MessageEvent> {
    3. @Override
    4. public void onApplicationEvent(MessageEvent messageEvent) {
    5. System.out.println("用户注册成功,执行监听事件"+messageEvent.getSource());
    6. }
    7. }

    用户注册成功后,applicationEventPublisher类似于被观察者)发布事件即可,代码如下:

    1. @RestController
    2. public class UserController implements ApplicationContextAware{
    3. @Autowired
    4. private UserService userService;
    5. @Autowired
    6. private ApplicationEventPublisher applicationEventPublisher;
    7. @RequestMapping("springListenRegister")
    8. public String springListenRegister(UserParam userParam) {
    9. System.out.println("开始注册");
    10. userService.addUser(userParam);
    11. //用户注册成功,发布事件
    12. applicationEventPublisher.publishEvent(new MessageEvent("666"));
    13. return "SUCCESS";
    14. }
    15. }

    运行结果:

    1. 开始注册
    2. 用户注册成功,执行监听事件666

    这个也是同步阻塞的方式实现的,等下下个小节先介绍完Spring观察者模式的原理,再来看如何抽取一个通用的异步非阻塞观察者模式。

    5、Spring观察者模式原理

    Spring 中实现的观察者模式包含三部分:分别是Event事件(相当于消息)、Listener监听者(相当于观察者)、Publisher发送者(相当于被观察者)。用个图表示就是这样:
    后端思维之抽取观察者模板 - 图2
    这个ApplicationEvent是放到哪里的,监听者AppliactionListener是如何监听到的。接下来看下Spring框架的观察者原理是怎样~
    先来看下ApplicationEventPublisher源代码(被观察者/发布者) ```java @FunctionalInterface public interface ApplicationEventPublisher {

    default void publishEvent(ApplicationEvent event) {

    1. publishEvent((Object) event);

    }

    void publishEvent(Object event);

}

  1. `ApplicationEventPublisher`它只是一个函数式接口,再看下它接口方法的实现。它的具体实现类是`AbstractApplicationContext`,这个类代码有点多,把关键部分代码贴出来了:
  2. ```java
  3. public abstract class AbstractApplicationContext extends ... {
  4. //监听者(观察者列表)
  5. private final Set<ApplicationListener<?>> applicationListeners;
  6. //构造器,初始化观察者列表
  7. public AbstractApplicationContext() {
  8. this.applicationListeners = new LinkedHashSet();
  9. //...
  10. }
  11. //发布事件
  12. public void publishEvent(ApplicationEvent event) {
  13. this.publishEvent(event, (ResolvableType)null);
  14. }
  15. public void publishEvent(Object event) {
  16. this.publishEvent(event, (ResolvableType)null);
  17. }
  18. //发布事件接口实现
  19. protected void publishEvent(Object event, ResolvableType eventType) {
  20. //...
  21. Object applicationEvent;
  22. if (event instanceof ApplicationEvent) {
  23. //如果event是ApplicationEvent对象,或者是它的子类
  24. applicationEvent = (ApplicationEvent)event;
  25. } else {
  26. // 如果不是ApplicationEvent对象或者它的子类,则将其包装成PayloadApplicationEvent事件,并获取对应的事件类型
  27. applicationEvent = new PayloadApplicationEvent(this, event);
  28. if (eventType == null) {
  29. eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
  30. }
  31. }
  32. if (this.earlyApplicationEvents != null) {
  33. this.earlyApplicationEvents.add(applicationEvent);
  34. } else {
  35. //真正的消息发送,是通过它。获取ApplicationEventMulticaster,调用multicastEvent方法广播事件
  36. this.getApplicationEventMulticaster().multicastEvent(
  37. (ApplicationEvent)applicationEvent, eventType);
  38. }
  39. //如果当前命名空间还有父亲节点,也需要给父亲推送该消息
  40. if (this.parent != null) {
  41. if (this.parent instanceof AbstractApplicationContext) {
  42. ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
  43. } else {
  44. this.parent.publishEvent(event);
  45. }
  46. }
  47. }
  48. //添加观察者(监听者)
  49. public void addApplicationListener(ApplicationListener<?> listener) {
  50. Assert.notNull(listener, "ApplicationListener must not be null");
  51. if (this.applicationEventMulticaster != null) {
  52. this.applicationEventMulticaster.addApplicationListener(listener);
  53. } else {
  54. this.applicationListeners.add(listener);
  55. }
  56. }
  57. //观察者列表
  58. public Collection<ApplicationListener<?>> getApplicationListeners() {
  59. return this.applicationListeners;
  60. }
  61. // 注册监听器
  62. protected void registerListeners() {
  63. //把提前存储好的监听器添加到监听器容器中到ApplicationEventMulticaster
  64. for (ApplicationListener<?> listener : getApplicationListeners()) {
  65. getApplicationEventMulticaster().addApplicationListener(listener);
  66. }
  67. //获取类型是ApplicationListener的beanName集合,此处不会去实例化bean
  68. String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
  69. for (String listenerBeanName : listenerBeanNames) {
  70. getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
  71. }
  72. Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
  73. this.earlyApplicationEvents = null;
  74. //如果存在earlyEventsToProcess,提前处理这些事件
  75. if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
  76. for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
  77. getApplicationEventMulticaster().multicastEvent(earlyEvent);
  78. }
  79. }
  80. }
  81. }

通过以上代码,可以发现,真正的消息发送,实际上是通过事件广播器ApplicationEventMulticaster这个接口来完成的。multicastEvent是主要方法,这个方法的实现在类SimpleApplicationEventMulticaster中,一起来看下源码:

  1. public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
  2. ...
  3. //线程池
  4. @Nullable
  5. protected Executor getTaskExecutor() {
  6. return this.taskExecutor;
  7. }
  8. public void setTaskExecutor(@Nullable Executor taskExecutor) {
  9. this.taskExecutor = taskExecutor;
  10. }
  11. @Override
  12. public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  13. ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  14. // 根据event类型获取适合的监听器
  15. Executor executor = getTaskExecutor();
  16. for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
  17. if (executor != null) {
  18. //如果executor不为空,异步调用执行监听器中的方法
  19. executor.execute(() -> invokeListener(listener, event));
  20. }
  21. else {
  22. //调用监听器的方法
  23. invokeListener(listener, event);
  24. }
  25. }
  26. }
  27. protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
  28. ErrorHandler errorHandler = getErrorHandler();
  29. //如果存在ErrorHandler,调用监听器方法(会用try...catch包一下)
  30. if (errorHandler != null) {
  31. try {
  32. doInvokeListener(listener, event);
  33. }
  34. catch (Throwable err) {
  35. //如果抛出异常则调用ErrorHandler来处理异常。
  36. errorHandler.handleError(err);
  37. }
  38. }
  39. else {
  40. 否则直接调用监听器方法
  41. doInvokeListener(listener, event);
  42. }
  43. }
  44. ...
  45. }

可以发现,默认情况下,Spring实现的观察者模式,同步阻塞的。如果想异步执行事件,可以自定义SimpleApplicationEventMulticaster,然后构造一下executor线程池就好了。代码如下:

  1. @Component
  2. public class ListenerConfig {
  3. //把线程池赋值进去
  4. @Bean
  5. public SimpleApplicationEventMulticaster applicationEventMulticaster() {
  6. SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
  7. simpleApplicationEventMulticaster.setTaskExecutor(simpleAsyncTaskExecutor());
  8. return simpleApplicationEventMulticaster;
  9. }
  10. @Bean
  11. public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
  12. return new SimpleAsyncTaskExecutor();
  13. }
  14. }

demo跑一下,运行结果:

  1. 注册开始
  2. 当前线程名称http-nio-8080-exec-1
  3. 注册结束
  4. 用户注册成功2,执行监听事件666Sat Jun 18 11:44:07 GMT+08:00 2022
  5. 当前线程名称:SimpleAsyncTaskExecutor-20
  6. 当前线程名称:SimpleAsyncTaskExecutor-19
  7. 用户注册成功,执行监听事件666Sat Jun 18 11:44:12 GMT+08:00 2022

如果手动新建SimpleApplicationEventMulticaster,并设置taskExecutor的话,所有的监听响应事件,都是异步执行的。而有些有些场景希望同步执行的,这时候这种实现方式就不好了。
其实Spring提供了@Async注解,可以用来实现异步。具体怎么实现呢?其实很简单,只需要在配置类加上@EnableAsync,接着在需要异步执行的监听实现方法。加上@Async即可。代码实现如下:

  1. @Component
  2. @EnableAsync //配置类加上```@EnableAsync

public class ListenerConfig2 {

  1. @Bean
  2. public SimpleApplicationEventMulticaster applicationEventMulticaster() {
  3. SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
  4. return simpleApplicationEventMulticaster;
  5. }

}

@Component public class MessageAsyncListener3 implements ApplicationListener {

  1. @Async //方法异步注解
  2. @Override
  3. public void onApplicationEvent(MessageEvent messageEvent) {
  4. System.out.println("用户注册成功3,执行监听事件" + messageEvent.getSource() + new Date());
  5. System.out.println("当前线程名称:"+Thread.currentThread().getName());
  6. }

}

  1. 日常开发中,异步执行也可以自己手动通过线程池来开启。回到本文的**后端思维**主题,如果每个开发,都自己定义观察者模式的实现,这种代码会很混乱,**所以最好是实现一个可扩展,通用的观察者模板**。
  2. <a name="baAAY"></a>
  3. ## 6、基于**Spring**观察者模式,抽取一个模板
  4. 接下来的最后小节,基于**Spring**的观察者模式,一步一步实现并抽取个模板。<br />要基于**Spring**实现观察者模式的话,就包括这三步:
  5. 1. **定义**`**Event**`**事件(相当于消息),一般定义一个**`**Event**`**对象,继承**`**ApplicationEvent**`
  6. 1. **定义**`**Listener**`**监听者(相当于观察者),实现接口**`**ApplicationListener**`
  7. 1. `**Publisher**`**发送者(相当于被观察者),通过**`**ApplicationEventPublisher**`**发布。**
  8. <a name="ncGpH"></a>
  9. ### 6.1 定义Event事件对象
  10. 既然要抽取观察者模板,那肯定不是每个人自己写自己的`Event`,然后都去继承`ApplicationEvent`。<br />可以自己定义一个**项目相关的,通用**的`BaseEvent`类,然后一些相关通用的信息属性可以放进去,比如eventId或者流水号bizSeq什么的,都可以,看项目需要。以下代码,定义一个空空如也的`BaseEvent`
  11. ```java
  12. public class BaseEvent extends ApplicationEvent {
  13. public BaseEvent(Object source) {
  14. super(source);
  15. }
  16. public BaseEvent() {
  17. this("");
  18. }
  19. }

如果观察者模式,是注册成功之后,发个消息的,就可以声明一个消息类事件对象RegisterMessageEvent,继承通用的BaseEvent即可。然后属性可以自定义就好,比如messageId

  1. public class RegisterMessageEvent extends BaseEvent{
  2. private String msgId;
  3. public RegisterMessageEvent(String msgId) {
  4. super();
  5. this.msgId = msgId;
  6. }
  7. public String getMsgId() {
  8. return msgId;
  9. }
  10. public void setMsgId(String msgId) {
  11. this.msgId = msgId;
  12. }
  13. }

同理,如果想定义一个用户送礼物成功,然后发个广播,可以定义一个GiftSendEvent

  1. public class GiftSendEvent extends BaseEvent {
  2. private String giftId;
  3. public GiftSendEvent(String giftId) {
  4. this.giftId = giftId;
  5. }
  6. public String getGiftId() {
  7. return giftId;
  8. }
  9. public void setGiftId(String giftId) {
  10. this.giftId = giftId;
  11. }
  12. }

其他业务场景类似,只要想接入观察者模板,只需要自己定义事件对象,继承于BaseEvent即可。

6.2 定义Listener监听者(观察者)

定义完Event事件,就可以开始定义监听者了。定义的监听者,只需要实现接口ApplicationListener接口即可。如果每个人也是各写各的,这些就很乱,毫无模板规范可言了。
可以封装一下,如何封装呢?很简单,可以先声明一个IEventListener,让它继承ApplicationListener接口,都知道接口也是可以继承的。如下:

  1. public interface IEventListener extends ApplicationListener {
  2. }

监听者的实现,关键在于实现ApplicationListeneronApplicationEvent的接口方法即可。又因为未来别的业务场景接入观察者模式,都是按模板来,所以各个Event事件对象,都是继承于BaseEvent的,所以可以把<T extends BaseEvent>的泛型加进去,如下:

  1. public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T>{
  2. void onApplicationEvent(T event);
  3. }

有些时候,可能会有这种场景,就是执行监听逻辑只对部分数据(或者说部分特殊用户才执行)。既然是抽取监听模板,考虑到可扩展性,可以优化下IEventListener的代码。可以声明一个support的方法,默认是执行的,子类可以覆盖重写(让子类去控制是否执行这个监听逻辑),如下:

  1. public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
  2. void onApplicationEvent(T event);
  3. //接口里面,加了default,就可以写方法实现
  4. default boolean support(T event) {
  5. return true;
  6. }
  7. }

然后呢,只有support方法返回true,才执行监听的逻辑,还可以定义一个handler方法,给子类去实现自己的业务逻辑,代码如下:

  1. public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
  2. default void onApplicationEvent(T event){
  3. if (support(event)) {
  4. handler(event);
  5. }
  6. }
  7. default boolean support(T event) {
  8. return true;
  9. }
  10. //真正实现业务逻辑的接口,给子类去实现。
  11. void handler(T event);
  12. }

对着以上的代码模板,小伙伴们是否还有一些优化的想法呢?
如果方法产生了异常,是不是可以注意一下异常处理呢?以上小节,SimpleApplicationEventMulticaster源码分析的时候,不知道大家有没有细看,其实它就用了一个很巧妙的异常处理,可以借鉴一下,哈哈哈,这就是看源码的一个小小的好处了。
可以给onApplicationEvent的实现try…catch…一下,如果catch住异常的话,可以定义一个handlerException异常处理方法,给子类自定义去实现,当然,异常可以默认不处理嘛,代码如下:

  1. public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
  2. /**
  3. * 观察者的业务逻辑处理
  4. * @param event
  5. */
  6. default void onApplicationEvent(T event){
  7. try {
  8. if (support(event)) {
  9. handler(event);
  10. }
  11. } catch (Throwable e) {
  12. /**
  13. *
  14. */
  15. handleException(e);
  16. }
  17. }
  18. /**
  19. * 默认执行观察者的逻辑的
  20. * @param event
  21. * @return
  22. */
  23. default boolean support(T event) {
  24. return true;
  25. }
  26. /**
  27. * 观察者的逻辑,交给不同子类自定义实现
  28. * @param event
  29. */
  30. void handler(T event);
  31. /**
  32. * 异常默认不处理
  33. * @param exception
  34. */
  35. default void handleException(Throwable exception) {
  36. }
  37. }

最后呢,不同业务不同的监听者(观察者),直接实现IEventListener就好了,比如注册成功那个,声明一个RegisterMessageListenerImpl类,如下:

  1. @Service
  2. public class RegisterMessageListenerImpl implements IEventListener<RegisterMessageEvent> {
  3. @Override
  4. public void handler(RegisterMessageEvent event) {
  5. System.out.println("用户注册成功register,执行监听事件" + event.getSource() + new Date());
  6. }
  7. }

6.3 定义Publisher发送者模板

观察者模板,最后一步就是定义发送者模板。最简单的发送,就是利用ApplicationContext直接发送就好。

  1. @Component
  2. public class EventPublish {
  3. @Autowired
  4. private ApplicationContext applicationContext;
  5. void publish(BaseEvent event) {
  6. applicationContext.publishEvent(event);
  7. }
  8. }

为什么可以直接使用applicationContext来发送,文章开始介绍,不是用ApplicationEventPublisher来发送?其实是因为applicationContext继承了ApplicationEventPublisher接口
这个只是同步阻塞方式的观察者模式,一般来说,一个通用的观察者模板。也需要提供异步非阻塞方式的观察者模板。本文第5小节,都知道了,在配置类加上@EnableAsync,在需要异步执行的监听加上@Async,即可实现异步。
为了方便管理,和API语义更明确,可以手动设置线程池,给模板发布类,提供异步发送的接口。先自定义一个线程池,一般不建议直接使用JDK的线程池。
如何自定义线程池呢?
在application.properties配置文件,定义线程池一些属性(核心线程数、最大线程数等等)

  1. executor.corePoolSize=50
  2. executor.maxPoolSize=100
  3. executor.queueCapacity=200
  4. executor.keepAliveSeconds=120
  5. executor.threadNamePrefix=threadPoolExecutor

然后声明一个线程配置类ProjectExecutorsConfig:

  1. @Configuration
  2. @ConfigurationProperties("executor")//读取配置文件的线程池属性
  3. public class ProjectExecutorsConfig {
  4. private int corePoolSize;
  5. private int maxPoolSize;
  6. private int queueCapacity;
  7. private int keepAliveSeconds;
  8. private String threadNamePrefix;
  9. //省略get和set的方法
  10. }

通过线程配置类ProjectExecutorsConfig,初始化线程池ProjectExecutorPool,代码如下:

  1. @Configuration
  2. public class ProjectExecutorPool {
  3. @Autowired
  4. private ProjectExecutorsConfig projectExecutorsConfig;
  5. @Bean
  6. public ThreadPoolTaskExecutor eventExecutor() {
  7. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  8. executor.setCorePoolSize(projectExecutorsConfig.getCorePoolSize());
  9. executor.setMaxPoolSize(projectExecutorsConfig.getMaxPoolSize());
  10. executor.setKeepAliveSeconds(projectExecutorsConfig.getKeepAliveSeconds());
  11. executor.setQueueCapacity(projectExecutorsConfig.getQueueCapacity());
  12. executor.setThreadNamePrefix(projectExecutorsConfig.getThreadNamePrefix());
  13. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  14. executor.initialize();
  15. return executor;
  16. }
  17. }

然后回到EventPublish,把异步发布的接口加上,如下:

  1. /**
  2. * 观察者模式,事件发布通用接口
  3. **/
  4. @Component
  5. public class EventPublish {
  6. @Autowired
  7. private ApplicationContext applicationContext;
  8. @Autowired
  9. private ProjectExecutorPool projectExecutorPool;
  10. //同步阻塞
  11. public void publish(BaseEvent event) {
  12. applicationContext.publishEvent(event);
  13. }
  14. //异步发布(异步非阻塞)
  15. public void asyncPublish(BaseEvent event) {
  16. projectExecutorPool.eventExecutor().execute(()->{
  17. publish(event);
  18. });
  19. }
  20. }