Spring 通过事件监听机制,来发布某些事件,让监听者获取消息

Spring 中监听器的使用
可以自定义感兴趣的事件,继承 ApplicationContextEvent 类即可,然后在需要的地方发布该事件,再通过一个监听器监听该事件即可 context.publishEvent(ApplicationEvent event);
方式一:使用注解实现(推荐)
一个类可以监听多个事件
@Componentpublic class ApplicationEventListener {@EventListenerpublic void listenContextRefreshedEvent(ContextRefreshedEvent event) {ApplicationContext context = (ApplicationContext) event.getSource();System.out.println("context = " + context);}}
方式二:使用接口实现
缺点:一个类只能监听一个事件
@Componentpublic class ApplicationEventListener implements ApplicationListener<ContextRefreshedEvent> {@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {ApplicationContext context = (ApplicationContext) event.getSource();System.out.println("context = " + context);}}
方式三:使用 spring.factories
在 META-INF/spring.factories 中定义 Listener 组件,由 Spring 容器自动扫描,此种方式类似与接口实现方式,只是不需要添加 @Component 注解
# Application Listenersorg.springframework.context.ApplicationListener=\org.springframework.boot.autoconfigure.BackgroundPreinitializer
方式四:使用 @Bean 的方式
@Beanpublic LoginListener loginListener() {return new LoginListener();}
方式五:使用 ApplicationContext 直接添加
ac.addApplicationListener(new LoginListener());
方式六:Spring Boot 中使用 yaml 文件配置
context:listener:classes: com.sourceflag.spring.event.listener.LoginListener
监听过程模拟
那么 Spring 中的事件监听是如何实现的呢?我们简单的模拟一下 Spring 的事件监听机制的实现
事件
如果需要定义某些事件,必须继承该 AppEvent
public class AppEvent {}
监听者
监听者中,有一个方法 onEvent,接收一个感兴趣的监听事件,当有该事件发生的时候,系统会自动调用 onEvent 方法
public interface AppListener<E extends AppEvent> {void onEvent(E event);}
监听管理器
监听管理器用于记录所有监听者和发布事件,当监听者所监听的事件与此刻正在发布的事件相符的时候,就会推送给监听者
public class AppListenerManager {// 保存所有监听器static List<AppListener<?>> listeners = new ArrayList<>();// 添加监听器public static void addListener(AppListener<? extends AppEvent> appListener) {// 可以通过扫描项目,自动添加监听器listeners.add(appListener);}// 发布事件public static void publishEvent(AppEvent event) {for (AppListener appListener : listeners) {// 获取泛型ParameterizedType parameterizedType = (ParameterizedType) (appListener.getClass().getGenericInterfaces()[0]);Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();Class clazz = (Class) actualTypeArguments[0];// 检查类型是否匹配if (clazz.equals(event.getClass())) {appListener.onEvent(event);}}}}
案例(文件上传进度监听)
当文件上传的时候,用户希望获取文件上传的进度,这个需求就可以利用事件监听机制来实现
文件上传监听事件
记录了读取的文件大小和文件总大小
@Datapublic class FileUploadEvent extends AppEvent {private int fileSize;private int readSize;public FileUploadEvent(int readSize, int fileSize) {this.readSize = readSize;this.fileSize = fileSize;}}
文件上传监听事件
打印文件上传进度
public class FileUploadListener implements AppListener<FileUploadEvent> {@Overridepublic void onEvent(FileUploadEvent event) {int readSize = event.getReadSize();int fileSize = event.getFileSize();DecimalFormat df = new DecimalFormat("#.00");System.out.println(readSize + "/" + fileSize + " => " + df.format(((double) readSize / fileSize) * 100) + "%");}}
文件上传过程监听事件
当每次读取完一次数据的时候,通过发布事件的形式,实现监听
public class FileUtils {public static void write(InputStream in, OutputStream out) throws IOException {write(in, out, null);}public static void write(InputStream in, OutputStream out) throws IOException {BufferedInputStream bis = new BufferedInputStream(in);BufferedOutputStream bos = new BufferedOutputStream(out);int fileSize = in.available();byte[] buf = new byte[1024];int readSize = 0;int len;while ((len = bis.read(buf)) != -1) {bos.write(buf, 0, len);readSize += len;// 发布事件AppListenerManager.publishEvent(new FileUploadEvent(readSize, fileSize));}bis.close();bos.close();}public static void main(String[] args) throws IOException {AppListenerManager.addListener(new FileUploadListener());write(new FileInputStream(new File("D:\\1.jpg")), new FileOutputStream(new File("D:\\1-bak.jpg")));}}// 监听效果如下1024/5255 => 19.49%2048/5255 => 38.97%3072/5255 => 58.46%4096/5255 => 77.94%5120/5255 => 97.43%5255/5255 => 100.00%
源码流程分析
Spring 在进行进行 refresh() 的时候,会调用 initApplicationEventMulticaster() 来初始化事件管理器
创建事件管理器
protected void initApplicationEventMulticaster() {ConfigurableListableBeanFactory beanFactory = getBeanFactory();// 如果容器中存在if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {// 从容器中获取事件多播器,实现类是 SimpleApplicationEventMulticasterthis.applicationEventMulticaster =beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);if (logger.isTraceEnabled()) {logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");}}// 如果容器中不存在(一开始初始化的时候,并不存在事件多播器)else {// ★ 多播器保存了 beanFactory 对象this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);// ★ 将 applicationEventMulticaster 注册到容器中beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);if (logger.isTraceEnabled()) {logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");}}}
注册实现 ApplicationListener 接口的 bean
此时是把实现了 ApplicationListener 接口的监听器加入到 Set<String> applicationListenerBeans = new LinkedHashSet<>(); 中去
protected void registerListeners() {// Register statically specified listeners first.// 注册所有的监听器(// ★★★ 这里会在【第八次】调用后置处理器时// 调用 ApplicationListenerDetector 的 postProcessAfterInitialization 方法,加入到 applicationListeners 中for (ApplicationListener<?> listener : getApplicationListeners()) {// // ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListeners 中getApplicationEventMulticaster().addApplicationListener(listener);}// Do not initialize FactoryBeans here: We need to leave all regular beans// uninitialized to let post-processors apply to them!// ★★★ 获取通过继承了 ApplicationListener 接口的监听器String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);for (String listenerBeanName : listenerBeanNames) {// ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListenerBeans 中getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);}// Publish early application events now that we finally have a multicaster...Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;this.earlyApplicationEvents = null;if (earlyEventsToProcess != null) {for (ApplicationEvent earlyEvent : earlyEventsToProcess) {getApplicationEventMulticaster().multicastEvent(earlyEvent);}}}
调用后置处理器注册 ApplicationListener 接口的 bean
此时是调用 ApplicationListenerDetector 后置处理器的 postProcessAfterInitialization 方法,将实现了 ApplicationListener 接口的监听器注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>(); 中去
@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {// ★ 判断是否时 ApplicationListener 类型if (bean instanceof ApplicationListener) {// potentially not detected as a listener by getBeanNamesForType retrievalBoolean flag = this.singletonNames.get(beanName);if (Boolean.TRUE.equals(flag)) {// singleton bean (top-level or inner): register on the fly// ★★★ 添加监听器this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);}else if (Boolean.FALSE.equals(flag)) {if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {// inner bean with other scope - can't reliably process eventslogger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +"but is not reachable for event multicasting by its containing ApplicationContext " +"because it does not have singleton scope. Only top-level listener beans are allowed " +"to be of non-singleton scope.");}this.singletonNames.remove(beanName);}}return bean;}
注意一下,此时有组播管理器中有两个集合保存了实现了 ApplicationListener 接口的 bean
- Set
applicationListenerBeans:只保存了 bean 的名字 - Set
> applicationListeners:保存了 bean 的实例
注册 @EventListener 注解的监听器
当 bean 的生命周期函数全部完成以后,开始注册含有 @EventListener 的 bean,这个步骤比较复杂
1、使用 EventListenerMethodProcessor 的 afterSingletonsInstantiated 方法进行解析
2、将其当前 @EventListener 包装成 ApplicationListenerMethodAdapter 对象的属性,这样方便进行调用
3、将 ApplicationListenerMethodAdapter 注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>(); 中去
public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {this.beanName = beanName;this.method = BridgeMethodResolver.findBridgedMethod(method);this.targetMethod = (!Proxy.isProxyClass(targetClass) ?AopUtils.getMostSpecificMethod(method, targetClass) : this.method);this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass);EventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, EventListener.class);this.declaredEventTypes = resolveDeclaredEventTypes(method, ann);this.condition = (ann != null ? ann.condition() : null);this.order = resolveOrder(this.targetMethod);}
所以
- applicationListenerBeans:存储了实现了 ApplicationListener 接口的 bean
- applicationListeners:存储了所有的 Listener
异步调用
默认情况下,事件通知,是基于主线程的,这样可能会造成线程阻塞,所以我们可以开启异步通知
注解开启方式
适用于使用 @EventListener 注解的监听器
@Configuration@ComponentScan("org.wesoft.spring.event")@EnableAsyncpublic class AppConfig {}@Componentpublic class MyListenerB {@Async@EventListenerpublic void listen(AEvent event) {System.out.println(Thread.currentThread().getName() + " MyListenerB: " + event);}}
接口继承的开启方式
适用于实现了 ApplicationListener 接口的监听器
@Configuration@ComponentScan("org.wesoft.spring.event")public class AppConfig {// beanName 必须是 applicationEventMulticaster,才会被 Spring 所接收@Bean("applicationEventMulticaster")public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory, ThreadPoolTaskExecutor executor) {SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);applicationEventMulticaster.setTaskExecutor(executor);return applicationEventMulticaster;}@Beanpublic ThreadPoolTaskExecutor executor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();// 核心线程数taskExecutor.setCorePoolSize(5);// 最大线程数taskExecutor.setMaxPoolSize(10);// 队列大小taskExecutor.setQueueCapacity(100);// 线程池中的线程名称前缀taskExecutor.setThreadNamePrefix("async-task-");// 当 pool 已经达到 max size 的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化线程池taskExecutor.initialize();return taskExecutor;}}
