前言

最近在看线程池,就全局搜了之前公司代码,关键字 Executors,一下子就看见了在使用guavaAysncEventBus,在我使用的感觉来就是单个jvm里面的异步执行框架。它并没有每次都让我们显式让我们直接调用线程池的执行任务,而是帮助我们有封装了一层。

简单案例

  1. package org.example.eventbus;
  2. /**
  3. * @author huskyui
  4. */
  5. public class EventBusDemo {
  6. public static void main(String[] args) {
  7. EventHandler eventHandler = new EventHandler();
  8. EventBusRegisterCenter.register(eventHandler);
  9. for (int i = 0; i < 10; i++) {
  10. EventBusRegisterCenter.post(i + "");
  11. }
  12. for (int i = 0; i < 5; i++) {
  13. EventBusRegisterCenter.post(i);
  14. }
  15. }
  16. }
  17. package org.example.eventbus;
  18. import com.google.common.eventbus.AsyncEventBus;
  19. import lombok.extern.slf4j.Slf4j;
  20. import java.util.concurrent.Executors;
  21. /**
  22. * @author huskyui
  23. */
  24. @Slf4j
  25. public class EventBusRegisterCenter {
  26. private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
  27. public static void register(Object object) {
  28. System.out.println("register" + object);
  29. eventBus.register(object);
  30. }
  31. public static void unregister(Object obj) {
  32. eventBus.unregister(obj);
  33. }
  34. public static void post(Object event) {
  35. log.info("post event "+event);
  36. eventBus.post(event);
  37. }
  38. }
  39. package org.example.eventbus;
  40. import com.google.common.eventbus.DeadEvent;
  41. import com.google.common.eventbus.Subscribe;
  42. import lombok.extern.slf4j.Slf4j;
  43. /**
  44. * @author huskyui
  45. */
  46. @Slf4j
  47. public class EventHandler {
  48. @Subscribe
  49. public void handlerString(String msg){
  50. log.info("处理string类型数据 {}",msg);
  51. }
  52. @Subscribe
  53. public void handlerDeadEvent(DeadEvent deadEvent){
  54. log.info("dead event {}",deadEvent.getEvent());
  55. }
  56. }

解析

构造函数

  1. private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());

主要是将我们自定义的线程池赋值给EventBus成员变量

  1. private final Executor executor;
  2. EventBus(String identifier, Executor executor, Dispatcher dispatcher,
  3. SubscriberExceptionHandler exceptionHandler) {
  4. this.identifier = checkNotNull(identifier);
  5. this.executor = checkNotNull(executor);
  6. this.dispatcher = checkNotNull(dispatcher);
  7. this.exceptionHandler = checkNotNull(exceptionHandler);
  8. }

subscribe

  1. EventBusRegisterCenter.register(eventHandler);

通过反射,遍历对应类中方法标注了@Subscribe注解方法,并加入到一个guava实现的map中,key是一个对象,处理特定任务,value是对应方法的相关信息

  1. private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
  2. Maps.newConcurrentMap();
  3. /**
  4. * The event bus this registry belongs to.
  5. */
  6. @Weak private final EventBus bus;
  7. SubscriberRegistry(EventBus bus) {
  8. this.bus = checkNotNull(bus);
  9. }
  10. /**
  11. * Registers all subscriber methods on the given listener object.
  12. */
  13. void register(Object listener) {
  14. // 当前类 @Subscribe注解的method 列表
  15. Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
  16. for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
  17. // method的parameter ,AysncEventBus必须是方法是单个Object,你可以将多个参数封装到一个类里面,然后使用这个封装类
  18. Class<?> eventType = entry.getKey();
  19. // method相关信息,以便后续调用反射
  20. Collection<Subscriber> eventMethodsInListener = entry.getValue();
  21. // cow 好像是获取快照
  22. CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
  23. if (eventSubscribers == null) {
  24. CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
  25. eventSubscribers = MoreObjects.firstNonNull(
  26. // 如果为空,新建一个cow set加入map中
  27. subscribers.putIfAbsent(eventType, newSet), newSet);
  28. }
  29. // 不为空,执行在cow set中加入
  30. eventSubscribers.addAll(eventMethodsInListener);
  31. }
  32. }

post

  1. EventBusRegisterCenter.post(i + "");

根据传递给这个参数类型,在subscribers里面找到对应的处理类,遍历执行,可能有多个subscribe同一个类型的

  1. public void post(Object event) {
  2. Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  3. if (eventSubscribers.hasNext()) {
  4. // 直接将迭代器传过去
  5. dispatcher.dispatch(event, eventSubscribers);
  6. } else if (!(event instanceof DeadEvent)) {
  7. // the event had no subscribers and was not itself a DeadEvent
  8. post(new DeadEvent(this, event));
  9. }
  10. }
  11. @Override
  12. void dispatch(Object event, Iterator<Subscriber> subscribers) {
  13. checkNotNull(event);
  14. // queue的类型是ConcurrentLinkedQueue,主要是保存任务的
  15. while (subscribers.hasNext()) {
  16. queue.add(new EventWithSubscriber(event, subscribers.next()));
  17. }
  18. EventWithSubscriber e;
  19. while ((e = queue.poll()) != null) {
  20. // 开始进入和线程池相关的操作
  21. e.subscriber.dispatchEvent(e.event);
  22. }
  23. }
  24. // 这边其实是新建一个Runnable,提交给我们刚开始创建自定义线程池
  25. final void dispatchEvent(final Object event) {
  26. // 线程池execute 无返回值任务
  27. executor.execute(new Runnable() {
  28. @Override
  29. public void run() {
  30. try {
  31. // 反射形式调用方法
  32. invokeSubscriberMethod(event);
  33. } catch (InvocationTargetException e) {
  34. bus.handleSubscriberException(e.getCause(), context(event));
  35. }
  36. }
  37. });
  38. }
  39. /**
  40. * Invokes the subscriber method. This method can be overridden to make the invocation
  41. * synchronized.
  42. */
  43. @VisibleForTesting
  44. void invokeSubscriberMethod(Object event) throws InvocationTargetException {
  45. try {
  46. // target是实例,method是反射相关的
  47. // 这里也验证了之前说的,只能是一个参数
  48. method.invoke(target, checkNotNull(event));
  49. } catch (IllegalArgumentException e) {
  50. throw new Error("Method rejected target/argument: " + event, e);
  51. } catch (IllegalAccessException e) {
  52. throw new Error("Method became inaccessible: " + event, e);
  53. } catch (InvocationTargetException e) {
  54. if (e.getCause() instanceof Error) {
  55. throw (Error) e.getCause();
  56. }
  57. throw e;
  58. }
  59. }

AsyncEventBus和Spring结合使用

先举个例子

  1. @Component
  2. class ServiceA{
  3. @Autowried
  4. private ServiceB serviceB;
  5. @Subscribe
  6. public void handle(CouponEvent event) {
  7. serviceB.do(event);
  8. }
  9. }

使用几乎一样。但是其中有一个需要考虑,我们处理方法的时候,应该会调用一些Spring容器里面的SpringBean
这里就会牵扯到SpringBean的生命周期
我们如果直接写 eventBusCenter.register(new ServiceA())我们就会发现,ServiceA里面的变量serviceB没有赋值
我们需要在properties set之后,进行注入
这边牵扯到@PostConstruct注解,这个注解会在bean的成员变量加载后,执行该方法

所以最终的大致样子

  1. @Component
  2. class ServiceA{
  3. @Autowried
  4. private ServiceB serviceB;
  5. @Subscribe
  6. public void handle(CouponEvent event) {
  7. serviceB.do(event);
  8. }
  9. @PostConstruct
  10. public void register() {
  11. AsyncEventBusCenter.register(this);
  12. }
  13. }
  14. public class AsyncEventBusCenter {
  15. private final static AsyncEventBus ASYNC_EVENT_BUS = new AsyncEventBus(Executors.newCachedThreadPool());
  16. public static void register(Object handler) {
  17. ASYNC_EVENT_BUS.register(handler);
  18. }
  19. }