前言
最近在看线程池,就全局搜了之前公司代码,关键字 Executors
,一下子就看见了在使用guava
的AysncEventBus
,在我使用的感觉来就是单个jvm
里面的异步执行框架。它并没有每次都让我们显式让我们直接调用线程池的执行任务,而是帮助我们有封装了一层。
简单案例
package org.example.eventbus;
/**
* @author huskyui
*/
public class EventBusDemo {
public static void main(String[] args) {
EventHandler eventHandler = new EventHandler();
EventBusRegisterCenter.register(eventHandler);
for (int i = 0; i < 10; i++) {
EventBusRegisterCenter.post(i + "");
}
for (int i = 0; i < 5; i++) {
EventBusRegisterCenter.post(i);
}
}
}
package org.example.eventbus;
import com.google.common.eventbus.AsyncEventBus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
/**
* @author huskyui
*/
@Slf4j
public class EventBusRegisterCenter {
private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
public static void register(Object object) {
System.out.println("register" + object);
eventBus.register(object);
}
public static void unregister(Object obj) {
eventBus.unregister(obj);
}
public static void post(Object event) {
log.info("post event "+event);
eventBus.post(event);
}
}
package org.example.eventbus;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
/**
* @author huskyui
*/
@Slf4j
public class EventHandler {
@Subscribe
public void handlerString(String msg){
log.info("处理string类型数据 {}",msg);
}
@Subscribe
public void handlerDeadEvent(DeadEvent deadEvent){
log.info("dead event {}",deadEvent.getEvent());
}
}
解析
构造函数
private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
主要是将我们自定义的线程池赋值给EventBus成员变量
private final Executor executor;
EventBus(String identifier, Executor executor, Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
subscribe
EventBusRegisterCenter.register(eventHandler);
通过反射,遍历对应类中方法标注了@Subscribe注解方法,并加入到一个guava实现的map中,key是一个对象,处理特定任务,value是对应方法的相关信息
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
/**
* The event bus this registry belongs to.
*/
@Weak private final EventBus bus;
SubscriberRegistry(EventBus bus) {
this.bus = checkNotNull(bus);
}
/**
* Registers all subscriber methods on the given listener object.
*/
void register(Object listener) {
// 当前类 @Subscribe注解的method 列表
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
// method的parameter ,AysncEventBus必须是方法是单个Object,你可以将多个参数封装到一个类里面,然后使用这个封装类
Class<?> eventType = entry.getKey();
// method相关信息,以便后续调用反射
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// cow 好像是获取快照
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(
// 如果为空,新建一个cow set加入map中
subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 不为空,执行在cow set中加入
eventSubscribers.addAll(eventMethodsInListener);
}
}
post
EventBusRegisterCenter.post(i + "");
根据传递给这个参数类型,在subscribers
里面找到对应的处理类,遍历执行,可能有多个subscribe同一个类型的
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 直接将迭代器传过去
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
// queue的类型是ConcurrentLinkedQueue,主要是保存任务的
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
// 开始进入和线程池相关的操作
e.subscriber.dispatchEvent(e.event);
}
}
// 这边其实是新建一个Runnable,提交给我们刚开始创建自定义线程池
final void dispatchEvent(final Object event) {
// 线程池execute 无返回值任务
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 反射形式调用方法
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// target是实例,method是反射相关的
// 这里也验证了之前说的,只能是一个参数
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
AsyncEventBus和Spring结合使用
先举个例子
@Component
class ServiceA{
@Autowried
private ServiceB serviceB;
@Subscribe
public void handle(CouponEvent event) {
serviceB.do(event);
}
}
使用几乎一样。但是其中有一个需要考虑,我们处理方法的时候,应该会调用一些Spring容器里面的SpringBean
这里就会牵扯到SpringBean的生命周期
我们如果直接写 eventBusCenter.register(new ServiceA())
我们就会发现,ServiceA里面的变量serviceB没有赋值
我们需要在properties set之后,进行注入
这边牵扯到@PostConstruct
注解,这个注解会在bean的成员变量加载后,执行该方法
所以最终的大致样子
@Component
class ServiceA{
@Autowried
private ServiceB serviceB;
@Subscribe
public void handle(CouponEvent event) {
serviceB.do(event);
}
@PostConstruct
public void register() {
AsyncEventBusCenter.register(this);
}
}
public class AsyncEventBusCenter {
private final static AsyncEventBus ASYNC_EVENT_BUS = new AsyncEventBus(Executors.newCachedThreadPool());
public static void register(Object handler) {
ASYNC_EVENT_BUS.register(handler);
}
}