相信每一位读者都有使用过消息中间件的经历, 比如Apache ActiveMQ和Apache Kafka等, 某subscriber在消息中间件上注册了某个topic(主题) , 当有消息发送到了该topic上之后, 注册在该topic上的所有subscriber都将会收到消息, 如图所示。
1 Event Bus设计
Event Bus的设计稍微复杂一些, 所涉及的类比较多(10个左右) , 图是类/接口之间的类关系图。
- Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event,register方法用来注册Event接收者(Subscriber) 接受响应事件,Event Bus采用同步的方式推送Event,AsyncEventBus采用异步的方式(Thread-Per-Message) 推送Event。
- Registry注册表, 主要用来记录对应的Subscriber以及受理消息的回调方法, 回调方法我们用注解@Subscribe来标识。
- Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber。
1.1 Bus接口详解
Bus接口相关代码
/*
Bus 接口定义了Event Bus的所有使用方法
*/
public interface Bus {
/**
* 将某个对象注册到Bus上,从此之后该类就成为了Subscriber了
*/
void register(Object subscriber);
/**
* 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息
*/
void unregister(Object subscriber);
/**
* 提交Event到默认的topic
*/
void post(Object event);
/**
* 提交Event到指定的topic
*/
void post(Object event, String topic);
/**
* 关闭该Bus
*/
void close();
/**
* 返回Bus的名称标识
*/
String getBusName();
}
Bus接口中定义了注册topic的方法和Event发送的方法, 具体如下。
- register(Object subscriber) :将某个对象实例注册给Event Bus。
- unregister(Object subscriber) :取消对该对象实例的注册, 会在Event Bus的注册表(Registry) 中将其移除。
- post(Object event) :提交Event到Event Bus中, 如果未指定topic则会将event广播给Event Bus默认的topic。
- post(Object Event, Sing topic) :提交Event的同时指定了topic。
- close() :销毁该Event Bus。
- get Bus Name() :返回该Event Bus的名称。
注册对象给Event Bus的时候需要指定接收消息时的回调方法, 我们采用注解的方式进行Event回调,代码如所示
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
String topic() default "default-topic";
}
@Subscribe要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic)
1.2 同步EventBus详解
同步Event Bus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式,如果想要使用异步的方式进行推送,可使用Event Bus的子类AsyncEventBus,代码如所示。
import java.util.concurrent.Executor;
public class EventBus implements Bus {
// 用于维护Subscr的注册表
private final Registry registry = new Registry();
// Event Bus的名字
private String busName;
// 默认的Event Bus的名字
private final static String DEFAULT_BUS_NAME = "default";
// 默认的topic的名字
private final static String DEFAULT_TOPIC = "default-topic";
// 用于发布广播消息到各个Subscriber的类
private final Dispatcher dispatcher;
public EventBus(){
this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
public EventBus(String busName) {
this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {
this.busName = busName;
this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);
}
public EventBus(EventExceptionHandler eventExceptionHandler) {
this(DEFAULT_BUS_NAME, eventExceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
// 将注册Subscriber的动作直接委托给Register
@Override
public void register(Object subscriber) {
this.registry.bind(subscriber);
}
// 接触注册同样委托给Register
@Override
public void unregister(Object subscriber) {
this.registry.unbind(subscriber);
}
// 提交Event到默认的topic
@Override
public void post(Object event) {
this.post(event, DEFAULT_TOPIC);
}
// 提交Event到指定的topic,具体的动作是由Dispatcher来完成的
@Override
public void post(Object event, String topic) {
this.dispatcher.dispatch(this, registry, event, topic);
}
// 关闭销毁Bus
@Override
public void close() {
this.dispatcher.close();
}
@Override
public String getBusName() {
return this.busName;
}
}
在上述代码中:
- EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
- registry和unregister都是通过Subscriber注册表来完成的。
- Event的提交则是由Dispatcher来完成的。
- Executor并没有使用我们在第8章中开发的线程池,而是使用JDK中的Executor接口,我们自己开发的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
1.3 异步Event Bus详解
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可,代码如所示。
import java.util.concurrent.ThreadPoolExecutor;
public class AsyncEventBus extends EventBus {
AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
super(busName, exceptionHandler, executor);
}
public AsyncEventBus(String busName, ThreadPoolExecutor executor) {
this(busName, null, executor);
}
public AsyncEventBus(ThreadPoolExecutor executor) {
this("default-async", null, executor);
}
public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
this("default-async", exceptionHandler, executor);
}
}
1.4 Subscriber注册表Registry详解
注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口,注册表Registry的代码如所示。
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Registry {
//存储Subscribe集合和topic之间关系的map
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer =
new ConcurrentHashMap<>();
public void bind(Object subscriber) {
// 获取Subscriber Object的方法集合然后进行绑定
List<Method> subscribeMethods = null;
}
public void unbind(Object subscriber) {
// unbind为了提高速度,只对Subscriber 进行失效操作
subscriberContainer.forEach((key, queue) -> queue.forEach(
s-> {
if ( s.getSubscribeObject() == subscriber) {
s.setDisable(true);
}
}
));
}
public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
return subscriberContainer.get(topic);
}
private void tierSubscriber(Object subscriber, Method method) {
final Subscribe subscribe = method.getDeclaredAnnotations(Subscribe.class);
String topic = subscribe.topic();
// 当某个topic没有Subscribe Queue的时候创建一个
subscriberContainer.computeIfAbsent(topic, key->new ConcurrentLinkedQueue<>());
// 创建一个Subscriber并且加入Subscriber列表中
subscriberContainer.get(topic).add(new Subscriber(subscribe, method));
}
private List<Method> getSubscribeMethod(Object subscriber) {
final List<Method> methods = new ArrayList<>();
Class<?> temp = subscriber.getClass();
// 不断获取当前类和父类的所有@Subscribe方法
while(temp != null ) {
// 获取所有的方法
Method[] declaredMethods = temp.getDeclaredMethods();
// 只有public方法 && 有一个入惨 && 最重要的是被@Subscribe标识的方法才符合回调方法
Arrays.stream(declaredMethods).filter(
m->m.isAnnotationPresent(Subscribe.class) && m.getParameterCount()==1
&& m.getModifiers() == Modifier.PUBLIC
).forEach(methods::add);
temp = temp.getSuperclass();
}
return methods;
}
}
由于Registry是在Bus中使用的, 不能暴露给外部, 因此Registry被设计成了包可见的类, 我们所设计的Event Bus对Subscriber没有做任何限制, 但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic) , 同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息, 代码如下所示:
/**
* 非常普通的对象
*/
public class SimpleObject {
/**
* subscribe方法,比如使用@Subscribe标识,并且是void类型并且有一个参数
*/
@Subscribe(topic = "alex-topic")
public void test2(Integer x) {}
@Subscribe(topic = "test-topic")
public void test3(Integer x){}
}
SimpleObject的实例被注册到了EventBus之后,test 2和test 3这两个方法将会被加入到注册表中, 分别用来接受来自alex-topic和test-topic的event。
1.5 Event广播Dispatcher详解
前文中已经说过, Dispatcher的主要作用是将Event Bus post的event推送给每一个注册到topic上的subscriber上, 具体的推送其实就是执行被@Subscribe注解的方法, 示例代码如所示。
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
public class Dispatcher {
private final Executor executorServcie;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;
public Dispatcher(Executor executorServcie, EventExceptionHandler exceptionHandler) {
this.executorServcie = executorServcie;
this.exceptionHandler = exceptionHandler;
}
public void dispatch(Bus bus, Registry registry, Object event, String topic) {
// 根据topic获取所有的Subscriber列表
ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
if ( null == subscribers ) {
if ( exceptionHandler != null ) {
exceptionHandler.handle(new IllegalArgumentException("The topic")
, new BaseEventContext(bus.getBusName(),null, event));
}
return ;
}
// 遍历所有的方法,并且通过反射的方式进行方法调用
subscribers.stream()
.filter(subscriber -> !subscriber.isDisable())
.filter(subscriber -> {
Method subscribeMethod = subscriber.getSubscribeMethod();
Class<?> aClass = subscribeMethod.getParameterTypes()[0];
return (aClass.isAssignableFrom(event.getClass()));
}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
}
private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
Method subscribeMethod = subscriber.getSubscribeMethod();
Object subscribeObject = subscriber.getSubscribeObject();
executorServcie.execute(
() -> {
try {
subscribeMethod.invoke(subscribeObject, event);
} catch (Exception e) {
if ( null != exceptionHandler ) {
exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));
}
}
}
);
}
public void close() {
if ( executorServcie instanceof ExecutorService )
((ExecutorService) executorServcie).shutdown();
}
static Dispatcher newDispatcher(EventExceptionHandler eventExceptionHandler, Executor executor) {
return new Dispatcher(executor, eventExceptionHandler);
}
static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
}
static Dispatcher perThreadDispatcher(EventExceptionHandler eventExceptionHandler) {
return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, eventExceptionHandler);
}
// 顺序执行的ExecutorService
private static class SeqExecutorService implements Executor {
private final static SeqExecutorService INSTANCE = new SeqExecutorService();
@Override
public void execute(Runnable command) {
command.run();
}
}
// 每个线程负责一次消息发送
private static class PreThreadExecutorService implements Executor {
private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();
@Override
public void execute(Runnable command) {
command.run();
}
}
private static class BaseEventContext implements EventContext {
private final String eventBusName;
private final Subscriber subscriber;
private final Object event;
private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
this.eventBusName = eventBusName;
this.subscriber = subscriber;
this.event = event;
}
@Override
public String getSource() {
return this.eventBusName;
}
@Override
public Object getSubscriber() {
return subscriber != null ? subscriber.getSubscribeObject() :null;
}
@Override
public Method getSubscribe() {
return subscriber != null ? subscriber.getSubscribeMethod() : null;
}
@Override
public Object getEvent() {
return this.event;
}
}
}
在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类, 其主要是实现了JDK 1.5以后的Executor接口和EventContent。
1.6 其他类接口设计
除了上面一些比较核心的类之外, 还需要Subscriber封装类以及Event Context、EventExceptionHandler接口
(1)Subscriber类
import java.lang.reflect.Method;
public class Subscriber {
private final Object subscribeObject;
private final Method subscribeMethod;
private boolean disable = false;
public Subscriber(Object subscribeObject, Method subscribeMethod) {
this.subscribeObject = subscribeObject;
this.subscribeMethod = subscribeMethod;
}
public Object getSubscribeObject() {
return subscribeObject;
}
public Method getSubscribeMethod() {
return subscribeMethod;
}
public void setDisable(boolean disable) {
this.disable = disable;
}
public boolean isDisable() {
return disable;
}
}
Subscriber类封装了对象实例和被@Subscribe标记的方法, 也就是说一个对象实例有可能会被封装成若干个Subscriber。
(2) EventExceptionHandler
Event Bus会将方法的调用交给Runnable接口去执行,我们都知道Runnable接口不能抛出checked异常信息,并且在每一个subscribe方法中,也不允许将异常抛出从而影响Event Bus对后续Subscriber进行消息推送,但是异常信息又不能被忽略掉,因此注册一个异常回调接口就可以知道在进行消息广播推送时都发生了什么,代码如所示。
public interface EventExceptionHandler {
void handle(Throwable cause, EventContext context);
}
(3)EventContext接口
Event接口提供了获取消息源、消息体,以及该消息是由哪一个Subscriber的哪个subscribe方法所接受, 主要用于消息推送出错时被回调接口Event Exception Handler使用,代码如所示。
import java.lang.reflect.Method;
public interface EventContext {
String getSource();
Object getSubscriber();
Method getSubscribe();
Object getEvent();
}
1.7 EventBus测试
(1)简单Subscriber
public class SimpleSubscriber1 {
@Subscribe
public void method1(String message) {
System.out.println("=======simpleSubscriber1===method1===" + message);
}
@Subscribe(topic = "test")
public void methods(String message) {
System.out.println("=======simpleSubscriber1===method2===" + message);
}
public static void main(String[] args) {
Bus bus = new EventBus("TestBus");
bus.register(new SimpleSubscriber1());
bus.post("Hello");
System.out.println("---------");
bus.post("Hello", "test");
}
}
(2)同步Bus
public static void main(String[] args) {
Bus bus = new EventBus("TestBus");
bus.register(new SimpleSubscriber1());
bus.post("Hello");
System.out.println("---------");
bus.post("Hello", "test");
}
(3)异步Bus
public static void main(String[] args) {
Bus bus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
bus.register(new SimpleSubscriber1());
bus.post("Hello");
System.out.println("-----------");
bus.post("Hello","test");
}
2 Event Bus实战-监控目录变化
记得笔者刚参加工作的时候,第一个开发任务就是监控某个硬件设备的运行时数据,然后记录在数据库中,该硬件设备在运行的过程中,会将一些性能信息等写入特殊的数据文件中,我要做的就是监控到该文件的变化,读取最后一行数据,然后根据格式将其解析出来插入数据库,实现的思路大致是:在程序首次启动时获取该文件的最后修改时间并且做文件的首次解析,然后每隔一段指定的时间检查一次文件最后被修改的时间,如果与记录的时间相等则等待下次的采集(Balking Pattern) , 否则进行新一轮的采集并且更新时间。
虽然这个程序足够简单,但是上述的实现方式还是存在着诸多问题,比如在采集时间间隔内,如果文件发生了N次变化,我只能获取到最后一次,其根本原因是文件的变化不会通知到应用程序,所以只能比较笨的主动去轮询。
JDK自1.7版本后提供了Watch Service类, 该类可以基于事件通知的方式监控文件或者目录的任何变化, 文件的改变相当于每一个事件(Event) 的发生, 针对不同的时间执行不同的动作, 本节将结合NIO 2.0中提供的Watch Service和我们实现的Event Bus实现文件目录的监控的功能。