Spring 通过事件监听机制,来发布某些事件,让监听者获取消息

事件监听与发布.png

Spring 中监听器的使用

可以自定义感兴趣的事件继承 ApplicationContextEvent 类即可,然后在需要的地方发布该事件,再通过一个监听器监听该事件即可 context.publishEvent(ApplicationEvent event);

方式一:使用注解实现(推荐)

一个类可以监听多个事件

  1. @Component
  2. public class ApplicationEventListener {
  3. @EventListener
  4. public void listenContextRefreshedEvent(ContextRefreshedEvent event) {
  5. ApplicationContext context = (ApplicationContext) event.getSource();
  6. System.out.println("context = " + context);
  7. }
  8. }

方式二:使用接口实现

缺点:一个类只能监听一个事件

  1. @Component
  2. public class ApplicationEventListener implements ApplicationListener<ContextRefreshedEvent> {
  3. @Override
  4. public void onApplicationEvent(ContextRefreshedEvent event) {
  5. ApplicationContext context = (ApplicationContext) event.getSource();
  6. System.out.println("context = " + context);
  7. }
  8. }

方式三:使用 spring.factories

在 META-INF/spring.factories 中定义 Listener 组件,由 Spring 容器自动扫描,此种方式类似与接口实现方式,只是不需要添加 @Component 注解

  1. # Application Listeners
  2. org.springframework.context.ApplicationListener=\
  3. org.springframework.boot.autoconfigure.BackgroundPreinitializer

方式四:使用 @Bean 的方式

  1. @Bean
  2. public LoginListener loginListener() {
  3. return new LoginListener();
  4. }

方式五:使用 ApplicationContext 直接添加

  1. ac.addApplicationListener(new LoginListener());

方式六:Spring Boot 中使用 yaml 文件配置

  1. context:
  2. listener:
  3. classes: com.sourceflag.spring.event.listener.LoginListener

监听过程模拟

那么 Spring 中的事件监听是如何实现的呢?我们简单的模拟一下 Spring 的事件监听机制的实现

事件

如果需要定义某些事件,必须继承该 AppEvent

  1. public class AppEvent {
  2. }

监听者

监听者中,有一个方法 onEvent,接收一个感兴趣的监听事件,当有该事件发生的时候,系统会自动调用 onEvent 方法

  1. public interface AppListener<E extends AppEvent> {
  2. void onEvent(E event);
  3. }

监听管理器

监听管理器用于记录所有监听者发布事件,当监听者所监听的事件与此刻正在发布的事件相符的时候,就会推送给监听者

  1. public class AppListenerManager {
  2. // 保存所有监听器
  3. static List<AppListener<?>> listeners = new ArrayList<>();
  4. // 添加监听器
  5. public static void addListener(AppListener<? extends AppEvent> appListener) {
  6. // 可以通过扫描项目,自动添加监听器
  7. listeners.add(appListener);
  8. }
  9. // 发布事件
  10. public static void publishEvent(AppEvent event) {
  11. for (AppListener appListener : listeners) {
  12. // 获取泛型
  13. ParameterizedType parameterizedType = (ParameterizedType) (appListener.getClass().getGenericInterfaces()[0]);
  14. Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
  15. Class clazz = (Class) actualTypeArguments[0];
  16. // 检查类型是否匹配
  17. if (clazz.equals(event.getClass())) {
  18. appListener.onEvent(event);
  19. }
  20. }
  21. }
  22. }

案例(文件上传进度监听)

当文件上传的时候,用户希望获取文件上传的进度,这个需求就可以利用事件监听机制来实现

文件上传监听事件

记录了读取的文件大小和文件总大小

  1. @Data
  2. public class FileUploadEvent extends AppEvent {
  3. private int fileSize;
  4. private int readSize;
  5. public FileUploadEvent(int readSize, int fileSize) {
  6. this.readSize = readSize;
  7. this.fileSize = fileSize;
  8. }
  9. }

文件上传监听事件

打印文件上传进度

  1. public class FileUploadListener implements AppListener<FileUploadEvent> {
  2. @Override
  3. public void onEvent(FileUploadEvent event) {
  4. int readSize = event.getReadSize();
  5. int fileSize = event.getFileSize();
  6. DecimalFormat df = new DecimalFormat("#.00");
  7. System.out.println(readSize + "/" + fileSize + " => " + df.format(((double) readSize / fileSize) * 100) + "%");
  8. }
  9. }

文件上传过程监听事件

当每次读取完一次数据的时候,通过发布事件的形式,实现监听

  1. public class FileUtils {
  2. public static void write(InputStream in, OutputStream out) throws IOException {
  3. write(in, out, null);
  4. }
  5. public static void write(InputStream in, OutputStream out) throws IOException {
  6. BufferedInputStream bis = new BufferedInputStream(in);
  7. BufferedOutputStream bos = new BufferedOutputStream(out);
  8. int fileSize = in.available();
  9. byte[] buf = new byte[1024];
  10. int readSize = 0;
  11. int len;
  12. while ((len = bis.read(buf)) != -1) {
  13. bos.write(buf, 0, len);
  14. readSize += len;
  15. // 发布事件
  16. AppListenerManager.publishEvent(new FileUploadEvent(readSize, fileSize));
  17. }
  18. bis.close();
  19. bos.close();
  20. }
  21. public static void main(String[] args) throws IOException {
  22. AppListenerManager.addListener(new FileUploadListener());
  23. write(new FileInputStream(new File("D:\\1.jpg")), new FileOutputStream(new File("D:\\1-bak.jpg")));
  24. }
  25. }
  26. // 监听效果如下
  27. 1024/5255 => 19.49%
  28. 2048/5255 => 38.97%
  29. 3072/5255 => 58.46%
  30. 4096/5255 => 77.94%
  31. 5120/5255 => 97.43%
  32. 5255/5255 => 100.00%

源码流程分析

Spring 在进行进行 refresh() 的时候,会调用 initApplicationEventMulticaster() 来初始化事件管理器

创建事件管理器

  1. protected void initApplicationEventMulticaster() {
  2. ConfigurableListableBeanFactory beanFactory = getBeanFactory();
  3. // 如果容器中存在
  4. if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
  5. // 从容器中获取事件多播器,实现类是 SimpleApplicationEventMulticaster
  6. this.applicationEventMulticaster =
  7. beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
  8. if (logger.isTraceEnabled()) {
  9. logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
  10. }
  11. }
  12. // 如果容器中不存在(一开始初始化的时候,并不存在事件多播器)
  13. else {
  14. // ★ 多播器保存了 beanFactory 对象
  15. this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
  16. // ★ 将 applicationEventMulticaster 注册到容器中
  17. beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
  18. if (logger.isTraceEnabled()) {
  19. logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
  20. "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
  21. }
  22. }
  23. }

注册实现 ApplicationListener 接口的 bean

此时是把实现了 ApplicationListener 接口的监听器加入到 Set<String> applicationListenerBeans = new LinkedHashSet<>(); 中去

  1. protected void registerListeners() {
  2. // Register statically specified listeners first.
  3. // 注册所有的监听器(
  4. // ★★★ 这里会在【第八次】调用后置处理器时
  5. // 调用 ApplicationListenerDetector 的 postProcessAfterInitialization 方法,加入到 applicationListeners 中
  6. for (ApplicationListener<?> listener : getApplicationListeners()) {
  7. // // ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListeners 中
  8. getApplicationEventMulticaster().addApplicationListener(listener);
  9. }
  10. // Do not initialize FactoryBeans here: We need to leave all regular beans
  11. // uninitialized to let post-processors apply to them!
  12. // ★★★ 获取通过继承了 ApplicationListener 接口的监听器
  13. String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
  14. for (String listenerBeanName : listenerBeanNames) {
  15. // ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListenerBeans 中
  16. getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
  17. }
  18. // Publish early application events now that we finally have a multicaster...
  19. Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
  20. this.earlyApplicationEvents = null;
  21. if (earlyEventsToProcess != null) {
  22. for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
  23. getApplicationEventMulticaster().multicastEvent(earlyEvent);
  24. }
  25. }
  26. }

调用后置处理器注册 ApplicationListener 接口的 bean

此时是调用 ApplicationListenerDetector 后置处理器的 postProcessAfterInitialization 方法,将实现了 ApplicationListener 接口的监听器注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>(); 中去

  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) {
  3. // ★ 判断是否时 ApplicationListener 类型
  4. if (bean instanceof ApplicationListener) {
  5. // potentially not detected as a listener by getBeanNamesForType retrieval
  6. Boolean flag = this.singletonNames.get(beanName);
  7. if (Boolean.TRUE.equals(flag)) {
  8. // singleton bean (top-level or inner): register on the fly
  9. // ★★★ 添加监听器
  10. this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
  11. }
  12. else if (Boolean.FALSE.equals(flag)) {
  13. if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
  14. // inner bean with other scope - can't reliably process events
  15. logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
  16. "but is not reachable for event multicasting by its containing ApplicationContext " +
  17. "because it does not have singleton scope. Only top-level listener beans are allowed " +
  18. "to be of non-singleton scope.");
  19. }
  20. this.singletonNames.remove(beanName);
  21. }
  22. }
  23. return bean;
  24. }

注意一下,此时有组播管理器中有两个集合保存了实现了 ApplicationListener 接口的 bean

  • Set applicationListenerBeans:只保存了 bean 的名字
  • Set> applicationListeners:保存了 bean 的实例

注册 @EventListener 注解的监听器

当 bean 的生命周期函数全部完成以后,开始注册含有 @EventListener 的 bean,这个步骤比较复杂
1、使用 EventListenerMethodProcessor 的 afterSingletonsInstantiated 方法进行解析
2、将其当前 @EventListener 包装成 ApplicationListenerMethodAdapter 对象的属性,这样方便进行调用
3、将 ApplicationListenerMethodAdapter 注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>(); 中去

  1. public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
  2. this.beanName = beanName;
  3. this.method = BridgeMethodResolver.findBridgedMethod(method);
  4. this.targetMethod = (!Proxy.isProxyClass(targetClass) ?
  5. AopUtils.getMostSpecificMethod(method, targetClass) : this.method);
  6. this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass);
  7. EventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, EventListener.class);
  8. this.declaredEventTypes = resolveDeclaredEventTypes(method, ann);
  9. this.condition = (ann != null ? ann.condition() : null);
  10. this.order = resolveOrder(this.targetMethod);
  11. }

所以

  • applicationListenerBeans:存储了实现了 ApplicationListener 接口的 bean
  • applicationListeners:存储了所有的 Listener

异步调用

默认情况下,事件通知,是基于主线程的,这样可能会造成线程阻塞,所以我们可以开启异步通知

注解开启方式

适用于使用 @EventListener 注解的监听器

  1. @Configuration
  2. @ComponentScan("org.wesoft.spring.event")
  3. @EnableAsync
  4. public class AppConfig {
  5. }
  6. @Component
  7. public class MyListenerB {
  8. @Async
  9. @EventListener
  10. public void listen(AEvent event) {
  11. System.out.println(Thread.currentThread().getName() + " MyListenerB: " + event);
  12. }
  13. }

接口继承的开启方式

适用于实现了 ApplicationListener 接口的监听器

  1. @Configuration
  2. @ComponentScan("org.wesoft.spring.event")
  3. public class AppConfig {
  4. // beanName 必须是 applicationEventMulticaster,才会被 Spring 所接收
  5. @Bean("applicationEventMulticaster")
  6. public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory, ThreadPoolTaskExecutor executor) {
  7. SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
  8. applicationEventMulticaster.setTaskExecutor(executor);
  9. return applicationEventMulticaster;
  10. }
  11. @Bean
  12. public ThreadPoolTaskExecutor executor() {
  13. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  14. // 核心线程数
  15. taskExecutor.setCorePoolSize(5);
  16. // 最大线程数
  17. taskExecutor.setMaxPoolSize(10);
  18. // 队列大小
  19. taskExecutor.setQueueCapacity(100);
  20. // 线程池中的线程名称前缀
  21. taskExecutor.setThreadNamePrefix("async-task-");
  22. // 当 pool 已经达到 max size 的时候,如何处理新任务
  23. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  24. taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  25. // 初始化线程池
  26. taskExecutor.initialize();
  27. return taskExecutor;
  28. }
  29. }