官方文档:标准和自定义事件,本章只介绍自定义事件,这里是 单机事件 官方文档笔记 该功能在 spring-framework core 包中,所以一般依赖一个 boot 包,就已经可以使用这个功能了 Spring Event 如果在 SpringCloud 环境中,还可以利用 SpringCloudBus 实现分布式事件驱动,可以看这个源码解析笔记 Spring Cloud Bus 将单机事件变成分布式事件机制(远程事件)例子

这个事件有什么作用?简单说他的功能是:模拟一个生产消息、订阅消息的功能,在程序内实现一些逻辑的解耦。

比如:创建了一个账户,要给这个账户邮箱发送通知,有时候发送邮件比较耗时,可以起一个线程异步发送,那么使用 Spring Event 就能更优雅的解决这个问题。

  1. 创建账户后,发布一个事件
  2. 监听这个事件,然后给这个发送邮件(可以同步或则异步,异步就类似使用一个新的线程来发送,同步则像调用方法一样)

1. 实现一个自定义事件 ApplicationEvent

实现 ApplicationEvent 接口,将消息内容封装到该类里面

  1. package cn.mrcode.event;
  2. import org.springframework.context.ApplicationEvent;
  3. import lombok.Getter;
  4. import lombok.ToString;
  5. /**
  6. * 审批单创建事件
  7. */
  8. @ToString
  9. @Getter
  10. public class ApprovalCreateEvent extends ApplicationEvent {
  11. private Long id;
  12. private ApprovalDataType dataType;
  13. private Integer dataId;
  14. // 第一个参数:是事件发布器程序,可以理解为,从事件中可以获取到该事件是从哪一个 发布器程序中发布的
  15. // 其他参数都是你的业务信息
  16. // 这里通过构造方式传入,是不想这个消息在外部被更改,通用这里只提供农了 @Getter,并没有提供 setter 方法
  17. public ApprovalCreateEvent(Object source, Long approvalId, ApprovalDataType dataType, Integer dataId) {
  18. super(source);
  19. this.id = approvalId;
  20. this.dataType = dataType;
  21. this.dataId = dataId;
  22. }
  23. // 还可以定义自定义的方法
  24. public boolean isSame(ApprovalDataType dataType) {
  25. return dataType == this.dataType;
  26. }
  27. }

2. 实现事件发布器 ApplicationEventPublisherAware

事件发布器,就类似与 MQ 的消息投递
实现 ApplicationEventPublisherAware 接口,获得 Spring 注入的事件发布对象 ApplicationEventPublisher,然后通过该对象将 事件发布出去

  1. package cn.mrcode..event;
  2. import org.springframework.context.ApplicationEventPublisher;
  3. import org.springframework.context.ApplicationEventPublisherAware;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author mrcode
  7. */
  8. @Component
  9. public class ApprovalEventPublisher implements ApplicationEventPublisherAware {
  10. // 持有这个事件发布对象
  11. private ApplicationEventPublisher publisher;
  12. @Override
  13. public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
  14. this.publisher = applicationEventPublisher;
  15. }
  16. /**
  17. * 审批单创建事件
  18. *
  19. * @param approvalId
  20. * @param dataId
  21. */
  22. public void publishCreateEvent(Long approvalId, ApprovalDataType dataType, Integer dataId) {
  23. publisher.publishEvent(new ApprovalCreateEvent(this, approvalId, dataType, dataId));
  24. }
  25. }

在发布器里面编写了一个 publishCreateEvent 发布创建事件的方法,里面又委托 事件发布对象,将 ApplicationEvent 发布了出去

3. 监听事件 - EventListener

事件监听就类似与 MQ 的消息消费
这里会使用 @EventListener 注解来标识我们对哪一个事件感兴趣

  1. package cn.mrcode.service.lister;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.event.EventListener;
  5. import org.springframework.scheduling.annotation.Async;
  6. import org.springframework.stereotype.Service;
  7. import cn.hutool.core.util.StrUtil;
  8. import lombok.extern.slf4j.Slf4j;
  9. /**
  10. * @author mrcode
  11. */
  12. @Slf4j
  13. @Service
  14. public class PerformanceEventProcess {
  15. /**
  16. * 审批发起后,立马通知审批人审批
  17. *
  18. * @param event
  19. */
  20. @EventListener
  21. @Async
  22. public void initiateApprovalNotifyProcess(ApprovalCreateEvent event) {
  23. if (!event.isSame(ApprovalDataType.PERFORMANCE)) {
  24. return;
  25. }
  26. // 获取事件内容
  27. final Long id = event.getId();
  28. // 然后做自己的业务逻辑
  29. }
  30. }
  • @EventListener:标识的方法,方法中的入参就是 哪一个具体的事件对象,标识你对这个事件感兴趣
  • @Async:异步执行,利用 spring scheduling 功能,一个全局的线程池,将你的事件消费逻辑包裹在一个线程中执行,如果没有这个注解的话,就相当于是同步执行。 ```javascript package cn.mrcode.event;

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication // @EnableScheduling @EnableAsync // 想要异步生效,还需要加该注解开启异步功能 public class CrmBelApplication {

  1. public static void main(String[] args) {
  2. SpringApplication.run(CrmBelApplication.class, args);
  3. }

}

  1. <a name="AAkq7"></a>
  2. ## 使用场景
  3. 在任何需要解耦的地方都可以使用,只不过这个是 JVM 级别的解耦,不是系统级别的解耦。
  4. - 一些复杂的业务逻辑中
  5. - 一些依赖一个数据的创建,有好几个模块 都需要做一些关联操作的时候
  6. 比如:创建一个账户
  7. 1. 积分模块,发放新人积分
  8. 2. 需要单独为这个账户 创建一张表之类的业务场景
  9. - 需要临时新增一个线程来执行逻辑的时候,使用异步,也达到了逻辑解耦
  10. <a name="n8sNS"></a>
  11. ## 注意事项
  12. 在一些场景中,会有一些问题,比如:
  13. <a name="mrE6e"></a>
  14. ### AOP 代理事物未提交,就异步消费
  15. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/651749/1649400925207-013d4693-e247-40fd-9c82-7de78bd62a37.png#clientId=uf123a28c-58dd-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=388&id=u551cac65&margin=%5Bobject%20Object%5D&name=image.png&originHeight=388&originWidth=1047&originalType=binary&ratio=1&rotation=0&showTitle=false&size=67860&status=done&style=none&taskId=ua8da776d-cef7-4aab-a36e-09095393e09&title=&width=1047)<br />比如上图的代码,在处理这个事件的时候,如果你用异步监听
  16. ```java
  17. @EventListener
  18. @Async
  19. public void xxx(Event e){
  20. consumerDataMapper.selectById(e.getId())
  21. }

那么这个代码就可能会出现: changeStatus 方法上的 AOP 事务代理还未提交,xxx 的异步监听事件中就去获取这事务里面修改的数据,就会导致获取不到正确的数据信息。这个问题还不是每次都会出现,有一定的时间差,所以比较难以发现。

问题根本原因:AOP 代理事务在方法调用完成后,才会提交

解决问题方案:

  1. 不使用异步:如果不使用 @Async,那么就等同于同步执行,这个时候还在事物内,就不会有问题
  2. 在消费时,先休眠几百毫秒:正常情况下几百毫秒足以提交事物了,不过还是有一定概率出现几百毫秒内事务提交不了的情况
  3. 使用手动提交事务的方式,也就是 Spring 中的编程事务(这里有一个 例子
  4. 事务操作完成后,再发布事件,比如改成如下这样

    1. @Override
    2. public void changeStatus(Integer id, ConsumerDataStatus status, String message) {
    3. consumerDataServiceImpl.doChangeStatus(id, status, message);
    4. consumerDataEventPublisher.publishConsumerDataStatusChangeEvent(id, status);
    5. }
    6. @Transactional(propagation = Propagation.REQUIRED)
    7. public void doChangeStatus(Integer id, ConsumerDataStatus status, String message) {
    8. final ConsumerData record = new ConsumerData();
    9. record.setId(id);
    10. record.setStatus(status.getValue());
    11. if (message != null && message.length() > 1024) {
    12. message = message.substring(0, 1024);
    13. }
    14. record.setErrInfo(message);
    15. consumerDataMapper.updateByPrimaryKeySelective(record);
    16. }

    此方法是推荐做法:调用处不用修改任何代码,消费处也不用做额外的处理。