spring 的 ApplicationListener 事件监听使用了观察者设计模式;
观察者设计模式需要以下几个角色:
- 事件
- 观察者
- 事件发布者
定义事件
// 实现 ApplicationEvent 接口, 定义事件
public class ConnectEventVO extends ApplicationEvent {
@Getter
private EventVO eventVO;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event
*
initially occurred (never {@code null})
*/
public ConnectEventVO(Object source, EventVO eventVO) {
super(source);
this.eventVO = eventVO;
}
}
事件发布
@Resource
private ApplicationEventPublisher eventPublisher;
public void onWebsocketClose(NettyClient ws, int code, Integer tryNum) {
log.error("@DeribitMarketListener Channel Close!!!.code:{},tryNum:{}",
code, tryNum);
final EventVO eventVO = new EventVO(EXCHANGE_DERIBIT.getExchangeName(),
CLOSE.toString(), this);
eventPublisher.publishEvent(new ConnectEventVO(this, eventVO));
}
定义观察者
@Component("connectEventListener")
public class ConnectEventListener {
@Resource
private ConnectMonitor connectMonitor;
/**
* 处理连接 close 事件
*
* @param connectEventVO 事件
*/
@EventListener(condition = "#connectEventVO.eventVO.connectStatus == 'OPEN'")
public void handleOpenEvent(ConnectEventVO connectEventVO) {
log.info("@ConnectEventListener.handleOpenEvent eventVO:{}",
connectEventVO.getEventVO());
connectMonitor.resetConnectCount(connectEventVO.getEventVO());
}
/**
* 处理连接 open 事件
*
* @param connectEventVO 事件
*/
@EventListener(condition = "#connectEventVO.eventVO.connectStatus == 'CLOSE'")
public void handleCloseEvent(ConnectEventVO connectEventVO) {
EventVO eventVO = connectEventVO.getEventVO();
log.info("@ConnectEventListener.handleCloseEvent exchange:{}",
eventVO.getExchange());
connectMonitor.reconnect(eventVO);
}
}
源码相关
事件发布后,拿到所有事件监听者,匹配到后。回调监听者,如果有 async 注解,回调方法会异步执行;
// @see SimpleApplicationEventMulticaster
public void multicastEvent(final ApplicationEvent event,
@Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType :
resolveDefaultEventType(event));
for (final ApplicationListener<?> listener :
getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
其他
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
/**
* Alias for {@link #classes}.
*/
@AliasFor("classes")
Class<?>[] value() default {};
/**
* The event classes that this listener handles.
* <p>If this attribute is specified with a single value, the
* annotated method may optionally accept a single parameter.
* However, if this attribute is specified with multiple values,
* the annotated method must <em>not</em> declare any parameters.
*/
@AliasFor("value")
Class<?>[] classes() default {};
/**
* Spring Expression Language (SpEL) attribute used for making the
* event handling conditional.
* <p>Default is {@code ""}, meaning the event is always handled.
* <p>The SpEL expression evaluates against a dedicated context that
* provides the following meta-data:
* <ul>
* <li>{@code #root.event}, {@code #root.args} for
* references to the {@link ApplicationEvent} and method arguments
* respectively.</li>
* <li>Method arguments can be accessed by index. For instance the
* first argument can be accessed via {@code #root.args[0]}, {@code #p0}
* or {@code #a0}. Arguments can also be accessed by name if that
* information is available.</li>
* </ul>
*/
String condition() default "";
}
@EventListener 的 condition 是支持 SpEL 表达式的,这就会很灵活。
@Component
public class FooPredicate implements Predicate<FooEvent> {
public boolean test(FooEvent event) {...}
}
// 可以调用 spring bean 的某个方法,作为前置条件
@EventListener(condition="@fooPredicate.test(#event)")
public void handle(FooEvent event) {
System.out.println();
}
TransactionalEventListener
使用这个listener 可以在事务提交后,做一些定制化的逻辑