Spring 通过事件监听机制,来发布某些事件,让监听者获取消息
Spring 中监听器的使用
可以自定义感兴趣的事件,继承 ApplicationContextEvent 类即可,然后在需要的地方发布该事件,再通过一个监听器监听该事件即可 context.publishEvent(ApplicationEvent event);
方式一:使用注解实现(推荐)
一个类可以监听多个事件
@Component
public class ApplicationEventListener {
@EventListener
public void listenContextRefreshedEvent(ContextRefreshedEvent event) {
ApplicationContext context = (ApplicationContext) event.getSource();
System.out.println("context = " + context);
}
}
方式二:使用接口实现
缺点:一个类只能监听一个事件
@Component
public class ApplicationEventListener implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext context = (ApplicationContext) event.getSource();
System.out.println("context = " + context);
}
}
方式三:使用 spring.factories
在 META-INF/spring.factories 中定义 Listener 组件,由 Spring 容器自动扫描,此种方式类似与接口实现方式,只是不需要添加 @Component 注解
# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer
方式四:使用 @Bean 的方式
@Bean
public LoginListener loginListener() {
return new LoginListener();
}
方式五:使用 ApplicationContext 直接添加
ac.addApplicationListener(new LoginListener());
方式六:Spring Boot 中使用 yaml 文件配置
context:
listener:
classes: com.sourceflag.spring.event.listener.LoginListener
监听过程模拟
那么 Spring 中的事件监听是如何实现的呢?我们简单的模拟一下 Spring 的事件监听机制的实现
事件
如果需要定义某些事件,必须继承该 AppEvent
public class AppEvent {
}
监听者
监听者中,有一个方法 onEvent,接收一个感兴趣的监听事件,当有该事件发生的时候,系统会自动调用 onEvent 方法
public interface AppListener<E extends AppEvent> {
void onEvent(E event);
}
监听管理器
监听管理器用于记录所有监听者和发布事件,当监听者所监听的事件与此刻正在发布的事件相符的时候,就会推送给监听者
public class AppListenerManager {
// 保存所有监听器
static List<AppListener<?>> listeners = new ArrayList<>();
// 添加监听器
public static void addListener(AppListener<? extends AppEvent> appListener) {
// 可以通过扫描项目,自动添加监听器
listeners.add(appListener);
}
// 发布事件
public static void publishEvent(AppEvent event) {
for (AppListener appListener : listeners) {
// 获取泛型
ParameterizedType parameterizedType = (ParameterizedType) (appListener.getClass().getGenericInterfaces()[0]);
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
Class clazz = (Class) actualTypeArguments[0];
// 检查类型是否匹配
if (clazz.equals(event.getClass())) {
appListener.onEvent(event);
}
}
}
}
案例(文件上传进度监听)
当文件上传的时候,用户希望获取文件上传的进度,这个需求就可以利用事件监听机制来实现
文件上传监听事件
记录了读取的文件大小和文件总大小
@Data
public class FileUploadEvent extends AppEvent {
private int fileSize;
private int readSize;
public FileUploadEvent(int readSize, int fileSize) {
this.readSize = readSize;
this.fileSize = fileSize;
}
}
文件上传监听事件
打印文件上传进度
public class FileUploadListener implements AppListener<FileUploadEvent> {
@Override
public void onEvent(FileUploadEvent event) {
int readSize = event.getReadSize();
int fileSize = event.getFileSize();
DecimalFormat df = new DecimalFormat("#.00");
System.out.println(readSize + "/" + fileSize + " => " + df.format(((double) readSize / fileSize) * 100) + "%");
}
}
文件上传过程监听事件
当每次读取完一次数据的时候,通过发布事件的形式,实现监听
public class FileUtils {
public static void write(InputStream in, OutputStream out) throws IOException {
write(in, out, null);
}
public static void write(InputStream in, OutputStream out) throws IOException {
BufferedInputStream bis = new BufferedInputStream(in);
BufferedOutputStream bos = new BufferedOutputStream(out);
int fileSize = in.available();
byte[] buf = new byte[1024];
int readSize = 0;
int len;
while ((len = bis.read(buf)) != -1) {
bos.write(buf, 0, len);
readSize += len;
// 发布事件
AppListenerManager.publishEvent(new FileUploadEvent(readSize, fileSize));
}
bis.close();
bos.close();
}
public static void main(String[] args) throws IOException {
AppListenerManager.addListener(new FileUploadListener());
write(new FileInputStream(new File("D:\\1.jpg")), new FileOutputStream(new File("D:\\1-bak.jpg")));
}
}
// 监听效果如下
1024/5255 => 19.49%
2048/5255 => 38.97%
3072/5255 => 58.46%
4096/5255 => 77.94%
5120/5255 => 97.43%
5255/5255 => 100.00%
源码流程分析
Spring 在进行进行 refresh() 的时候,会调用 initApplicationEventMulticaster() 来初始化事件管理器
创建事件管理器
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
// 如果容器中存在
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
// 从容器中获取事件多播器,实现类是 SimpleApplicationEventMulticaster
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
// 如果容器中不存在(一开始初始化的时候,并不存在事件多播器)
else {
// ★ 多播器保存了 beanFactory 对象
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
// ★ 将 applicationEventMulticaster 注册到容器中
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
注册实现 ApplicationListener 接口的 bean
此时是把实现了 ApplicationListener 接口的监听器加入到 Set<String> applicationListenerBeans = new LinkedHashSet<>();
中去
protected void registerListeners() {
// Register statically specified listeners first.
// 注册所有的监听器(
// ★★★ 这里会在【第八次】调用后置处理器时
// 调用 ApplicationListenerDetector 的 postProcessAfterInitialization 方法,加入到 applicationListeners 中
for (ApplicationListener<?> listener : getApplicationListeners()) {
// // ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListeners 中
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
// ★★★ 获取通过继承了 ApplicationListener 接口的监听器
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
// ★★★ 加入到 SimpleApplicationEventMulticaster 对象的 applicationListenerBeans 中
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
调用后置处理器注册 ApplicationListener 接口的 bean
此时是调用 ApplicationListenerDetector 后置处理器的 postProcessAfterInitialization 方法,将实现了 ApplicationListener 接口的监听器注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
中去
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// ★ 判断是否时 ApplicationListener 类型
if (bean instanceof ApplicationListener) {
// potentially not detected as a listener by getBeanNamesForType retrieval
Boolean flag = this.singletonNames.get(beanName);
if (Boolean.TRUE.equals(flag)) {
// singleton bean (top-level or inner): register on the fly
// ★★★ 添加监听器
this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
}
else if (Boolean.FALSE.equals(flag)) {
if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
// inner bean with other scope - can't reliably process events
logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
"but is not reachable for event multicasting by its containing ApplicationContext " +
"because it does not have singleton scope. Only top-level listener beans are allowed " +
"to be of non-singleton scope.");
}
this.singletonNames.remove(beanName);
}
}
return bean;
}
注意一下,此时有组播管理器中有两个集合保存了实现了 ApplicationListener 接口的 bean
- Set
applicationListenerBeans:只保存了 bean 的名字 - Set
> applicationListeners:保存了 bean 的实例
注册 @EventListener 注解的监听器
当 bean 的生命周期函数全部完成以后,开始注册含有 @EventListener 的 bean,这个步骤比较复杂
1、使用 EventListenerMethodProcessor 的 afterSingletonsInstantiated 方法进行解析
2、将其当前 @EventListener 包装成 ApplicationListenerMethodAdapter 对象的属性,这样方便进行调用
3、将 ApplicationListenerMethodAdapter 注册到 Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
中去
public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
this.beanName = beanName;
this.method = BridgeMethodResolver.findBridgedMethod(method);
this.targetMethod = (!Proxy.isProxyClass(targetClass) ?
AopUtils.getMostSpecificMethod(method, targetClass) : this.method);
this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass);
EventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, EventListener.class);
this.declaredEventTypes = resolveDeclaredEventTypes(method, ann);
this.condition = (ann != null ? ann.condition() : null);
this.order = resolveOrder(this.targetMethod);
}
所以
- applicationListenerBeans:存储了实现了 ApplicationListener 接口的 bean
- applicationListeners:存储了所有的 Listener
异步调用
默认情况下,事件通知,是基于主线程的,这样可能会造成线程阻塞,所以我们可以开启异步通知
注解开启方式
适用于使用 @EventListener 注解的监听器
@Configuration
@ComponentScan("org.wesoft.spring.event")
@EnableAsync
public class AppConfig {
}
@Component
public class MyListenerB {
@Async
@EventListener
public void listen(AEvent event) {
System.out.println(Thread.currentThread().getName() + " MyListenerB: " + event);
}
}
接口继承的开启方式
适用于实现了 ApplicationListener 接口的监听器
@Configuration
@ComponentScan("org.wesoft.spring.event")
public class AppConfig {
// beanName 必须是 applicationEventMulticaster,才会被 Spring 所接收
@Bean("applicationEventMulticaster")
public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory, ThreadPoolTaskExecutor executor) {
SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
applicationEventMulticaster.setTaskExecutor(executor);
return applicationEventMulticaster;
}
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(5);
// 最大线程数
taskExecutor.setMaxPoolSize(10);
// 队列大小
taskExecutor.setQueueCapacity(100);
// 线程池中的线程名称前缀
taskExecutor.setThreadNamePrefix("async-task-");
// 当 pool 已经达到 max size 的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}