1 Event-Driven Architecture基础
EDA(Event-Driven Architecture) 是一种实现组件之间松耦合、易扩展的架构方式, 在本节中, 我们先介绍EDA的基础组件, 让读者对EDA设计架构方式有一个基本的认识,一个最简单的EDA设计需要包含如下几个组件。
- Events:需要被处理的数据。
- Event Handlers:处理Events的方式方法。
- Event Loop:维护Events和Event Handlers之间的交互流程。
如图所示, EventA将被HandlerA处理, 而EventB将被HandlerB处理,这一切的分配都是由EventLoop所控制的。
1.1 Events
Events是EDA中的重要角色, 一个Event至少需要包含两个属性:类型和数据, Event的类型决定了它会被哪个Handler处理, 数据是在Handler中代加工的材料, 下面写一个简单的程序,代码如所示。
/*
Event 只包含了该Event所属的类型和所包含的数据
*/
public class Event {
private final String type;
private final String data;
public Event(String type, String data) {
this.type = type;
this.data = data;
}
public String getType() {
return type;
}
public String getData() {
return data;
}
}
1.2 Event Handlers
EventHandlers主要用于处理Event,比如一些filtering或者transforming数据的操作等,下面我们写两个比较简单的方法,代码如下:
// 用于处理A类型的Event
public static void handleEventA(Event e) {
System.out.println(e.getData().toLowerCase());
}
// 用于处理B类型的Event
public static void hanleEventB(Event e) {
System.out.println(e.getData().toUpperCase());
}
1.3 Event Loop
Event Loop处理接收到的所有Event, 并且将它们分配给合适的Handler去处理, 代码如下:
Event e;
while( !events.isEmpty()) {
// 从消息队列中不断移除,根据不同的类型进行处理
e = events.remove();
switch (e.getType()) {
case "A":
handleEventA(e);
break;
case "B":
hanleEventB(e);
break;
}
}
完整代码:
import java.util.LinkedList;
import java.util.Queue;
public class FooEventDrivenExample {
// 用于处理A类型的Event
public static void handleEventA(Event e) {
System.out.println(e.getData().toLowerCase());
}
// 用于处理B类型的Event
public static void hanleEventB(Event e) {
System.out.println(e.getData().toUpperCase());
}
public static void main(String[] args) {
Queue<Event> events = new LinkedList<>();
events.add(new Event("A", "Hello"));
events.add(new Event("B", "I am Event B"));
events.add(new Event("A", "I am Event A"));
Event e;
while( !events.isEmpty()) {
// 从消息队列中不断移除,根据不同的类型进行处理
e = events.remove();
switch (e.getType()) {
case "A":
handleEventA(e);
break;
case "B":
hanleEventB(e);
break;
}
}
}
}
2 开发一个Event-Driven框架
通过1节的基础知识介绍,我们大致可以知道,一个基于事件驱动的架构设计,总体来讲会涉及如下几个重要组件:事件消息(Event) 、针对该事件的具体处理器(Handler) 、接受事件消息的通道(29.1.3节中的queue) , 以及对事件消息如何进行分配(Event Loop) 。
2.1 同步EDA框架设计
(1) Message
回顾1节基础部分的介绍, 在基于Message的系统中, 每一个Event也可以被称为Message, Message是对Event更高一个层级的抽象, 每一个Message都有一个特定的Type用于与对应的Handler做关联, 3是Message接口的定义。
public interface Message {
/*
返回Message的类型
*/
Class<? extends Message> getType();
}
(2) Channel
第二个比较重要的概念就是Channels,Channel主要用于接受来自EventLoop分配的消息,每一个Channel负责处理一种类型的消息(当然这取决于你对消息如何进行分配) ,4是Channel接口的定义:
public interface Channel <E extends Message>{
/**
* dispatch方法用于负责Message的调度
*/
void dispatch(E message);
}
(3) Dynamic Router
Router的作用类似于1节中的EventLoop, 其主要是帮助Event找到合适的Channel并且传送给它, Dynamic Routers代码定义如5所示。
public interface DynamicRouter<E extends Message> {
/**
* 针对每一种Message类型注册相关的Channel,只有找到合适的channel该Message才会被处理
* @param messageType
* @param channel
*/
void registerChannel(Class<? extends E> messageType, Channel<? extends E> channel);
/**
* 为相应的Channel分配Message
* @param message
*/
void dispatch(E message);
}
Router如何知道要将Message分配给哪个Channel呢?换句话说,Router需要了解到Channel的存在,因此registerChannel() 方法的作用就是将相应的Channel注册给Router,dispatch方法则是根据Message的类型进行路由匹配。
(4) Event
Event是对Message的一个最简单的实现, 在以后的使用中, 将Event直接作为其他Message的基类即可(这种做法有点类似于适配器模式) , Event接口的定义如6所示。
public class Event implements Message{
@Override
public Class<? extends Message> getType() {
return getClass();
}
}
(5) EventDispatcher
EventDispatcher是对DynamicRouter的一个最基本的实现,适合在单线程的情况下进行使用,因此不需要考虑线程安全的问题。EventDispatcher接口的定义如7所示。
import java.util.HashMap;
import java.util.Map;
/*
EventDispatcher不是一个线程安全的类
*/
public class EventDispatcher implements DynamicRouter<Message> {
// 用于保存Channel和Message之间的关系
private final Map<Class<? extends Message>, Channel> routerTable;
public EventDispatcher(){
// 初始化RouteTable,但是在该实现中,我们使用HashMap作为路由表
this.routerTable = new HashMap<>();
}
@Override
public void registerChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {
this.routerTable.put(messageType, channel);
}
@Override
public void dispatch(Message message) {
if ( routerTable.containsKey(message.getType())) {
// 直接获取对应的Channel处理的Message
routerTable.get(message.getType()).dispatch(message);
} else {
throw new MessageMatcherException("Can't match the channel for ");
}
}
}
在EventDispatcher中有一个注册表routerTable, 主要用于存放不同类型Message对应的Channel, 如果没有与Message相对应的Channel, 则会抛出无法匹配的异常, 示例代码如8所示。
public class MessageMatcherException extends RuntimeException{
public MessageMatcherException(String message) {
super(message);
}
}
简单的测试用例
public class EventDispatcherExample {
/**
* InputEvent中定义了两个属性X和Y,主要用于在其他Channel中的运算
*/
static class InputEvent extends Event {
private final int x;
private final int y;
InputEvent(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
}
/**
* 用于存放结果的Event
*/
static class ResultEvent extends Event {
private final int result;
public ResultEvent(int result) {
this.result = result;
}
public int getResult() {
return result;
}
}
/**
* 处理ResultEvent的Handler(Channel),只是简单地将计算结果输出到控制台
*/
static class ResultEventHandler implements Channel<ResultEvent> {
@Override
public void dispatch(ResultEvent message) {
System.out.println("The reuslt is :" + message.getResult());
}
}
/**
* InputEventHandler需要向Router发送Event,因此在构造的时候需要传入Dispatcher
*/
static class InputEventHandler implements Channel<InputEvent> {
private final EventDispatcher dispatcher;
InputEventHandler(EventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
/**
* 将计算的结果构造成新的Event提交给Router
* @param message
*/
@Override
public void dispatch(InputEvent message) {
System.out.println("test");
int result = message.getX() + message.getY();
dispatcher.dispatch(new ResultEvent(result));
}
}
public static void main(String[] args) {
// 构造Router
EventDispatcher dispatcher = new EventDispatcher();
// 将Event 和 Handler(Channel)的绑定关系注册到Dispatcher
dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
dispatcher.dispatch(new InputEvent(1,2));
}
}
由于所有的类都存放于一个文件中,因此看起来测试代码比较多,其实结构还是非常清晰的, InputEvent是一个Message,它包含了两个Int类型的属性,而InputEventHandler是对InputEvent消息的处理,接收到了InputEvent消息之后,分别对X和Y进行相加操作,然后将结果封装成ResultEvent提交给EventDispatcher,ResultEvent相对比较简单,只包含了计算结果的属性,ResultEventHandler则将计算结果输出到控制台上。
通过上面这个例子的运行你会发现,不同数据的处理过程之间根本无须知道彼此的存在, 一切都由Event Dispatcher这个Router来控制, 它会给你想要的一切, 这是一种稀疏耦合(松耦合) 的设计。图29-2所示的为同步EDA架构类图。
2.2 异步EDA框架设计
在2.1节中,我们实现了一个基本的EDA框架,但是这个框架在应对高并发的情况下还是存在一些问题的,具体如下。
- Event Dispatcher不是线程安全的类, 在多线程的情况下,registerChannel方法会引起数据不一致的问题。
- 就目前言,我们实现的所有Channel都无法并发消费Message,比如 InputEventHandler只能逐个处理Message,低延迟的消息处理还会导致Dispatcher出现积压。
在本节中, 我们将对2.1节中的EDA框架进行扩充,使其可支持并发任务的执行,下面定义了一个新的AsyncChannel作为基类, 该类中提供了Message的并发处理能力, 代码如10所示。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class AsyncChannel implements Channel<Event>{
// 在AsyncChannel 中将使用ExecutorServcie多线程的方式提交给Message
private final ExecutorService executorService;
// 默认构造函数,提交了CPU的核数*2的线程数量
public AsyncChannel() {
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
}
// 用户自定义的ExecutorService
public AsyncChannel(ExecutorService executorService) {
this.executorService = executorService;
}
// 重写dispatch方法,并且用final修饰,避免子类重写
@Override
public void dispatch(Event message) {
executorService.submit(() -> this.handle(message));
}
// 提供抽象方法,供子类实现具体的Message处理
protected abstract void handle(Event message);
// 提供关闭ExecutorService的方法
void stop() {
if ( null != executorService && !executorService.isShutdown())
executorService.shutdown();
}
}
其次,还需要提供新的EventDispatcher类AsyncEventDispatcher负责以并发的方式dispatchMessage, 其中Event对应的Channel只能是Async Channel类型,并且也对外暴露了 shutdown方法, 代码如所示。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AsyncEventDispatcher implements DynamicRouter<Event>{
private final Map<Class<? extends Event>, AsyncChannel> routerTable;
public AsyncEventDispatcher() {
this.routerTable = new ConcurrentHashMap<>();
}
@Override
public void registerChannel(Class<? extends Event> messageType, Channel<? extends Event> channel) {
// 在AsyncEventDispatcher中,channel必须是AsyncChannel类型
if ( !(channel instanceof AsyncChannel )) {
throw new IllegalArgumentException("The channel must be ");
}
this.routerTable.put(messageType, (AsyncChannel) channel);
}
@Override
public void dispatch(Event message) {
if ( routerTable.containsKey(message.getType())) {
routerTable.get(message.getType()).dispatch(message);
} else {
throw new MessageMatcherException("");
}
}
public void shutdown() {
// 关闭所有的Channel以释放资源
routerTable.values().forEach(AsyncChannel::stop);
}
}
在AsyncEventDispatcher中,routerTable使用线程安全的Map定义,在注册Channel的时候,如果其不是AsyncChannel的类型,则会抛出异常。
异步EDA架构类图
import java.util.concurrent.TimeUnit;
public class AsyncEventDispatcherExample {
// 主要用于处理InputEvent, 但是需要继承AsyncChannel
static class AsyncInputEventHandler extends AsyncChannel {
private final AsyncEventDispatcher dispatcher;
AsyncInputEventHandler(AsyncEventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
// 不同于以同步的方式实现dispatch,异步的方式需要实现handle
@Override
protected void handle(Event message) {
EventDispatcherExample.InputEvent inputEvent =
(EventDispatcherExample.InputEvent)message;
System.out.println("test");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = inputEvent.getX() + inputEvent.getY();
dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
}
}
// 主要用于处理InputEvent,但是需要继承AsyncChannel
static class AsyncResultEventHandler extends AsyncChannel {
@Override
protected void handle(Event message) {
EventDispatcherExample.ResultEvent resultEvent =
(EventDispatcherExample.ResultEvent) message;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("the result is :" + resultEvent.getResult());
}
}
public static void main(String[] args) {
// 定义AsyncEventDispatcher
AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();
// 注册
dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
// 提交需要处理的Message
dispatcher.dispatch(new EventDispatcherExample.InputEvent(1,2));
}
}
当dispatcher分配一个Event的时候,如果执行非常缓慢也不会影响下一个Event被dispatch,这主要得益于我们采用了异步的处理方式(ExecutorService本身存在的任务队列可以允许异步提交一定数量级的数据)。
3 Event-Driven的使用
在本节中,我们模拟一个简单的聊天应用程序,借助于我们在2节开发的EDA小框架,首先我们要为聊天应用程序定义如下几个类型的Event。
- User Online Event: 当用户上线时来到聊天室的Event。
- User Offline Event: 当用户下线时退出聊天室的Event。
- User Chat Event:用户在聊天室中发送聊天信息的Event。
3.1 Chat Event
首先, 我们定义一个User对象, 代表聊天室的参与者, 比较简单就是一个名字, 代码如下:
public class User {
private final String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
下面定义一个UserOnlineEvent,代表用户上线的Event,代码如下:
public class UserOnlineEvent extends Event{
private final User user;
public UserOnlineEvent(User user) {
this.user = user;
}
public User getUser() {
return user;
}
}
下面定义一个UserOfflineEvent,代表用户下线的Event,代码如下:
public class UserOfflineEvent extends UserOnlineEvent{
public UserOfflineEvent(User user) {
super(user);
}
}
下面定义一个UserChatEvent,代表用户发送了聊天信息的Event,代码如下:
public class UserChatEvent extends UserOnlineEvent{
// ChatEvent需要有聊天的信息
private final String message;
public UserChatEvent(User user, String message) {
super(user);
this.message = message;
}
public String getMessage() {
return message;
}
}
UserChatEvent比其他两个Event多了代表聊天内容的message属性。
3.2 Chat Channel(Handler)
所有的Handler都非常简单, 只是将接收到的信息输出到控制台,由于是在多线程的环境下运行, 因此我们需要继承AsyncChannel。
下面定义一个UserOnlineEventChannel, 主要用于处理UserOnlineEvent事件, 代码如下:
// 用户上线的Event,简单输出用户上线即可
public class UserOnlineEventChannel extends AsyncChannel{
@Override
protected void handle(Event message) {
UserOnlineEvent event = (UserOnlineEvent) message;
System.out.println("The User ");
}
}
UserOfflineEventChannel代码:
public class UserOfflineEventChannel extends AsyncChannel{
@Override
protected void handle(Event message) {
UserOfflineEvent event = (UserOfflineEvent) message;
System.out.println("offline");
}
}
UserChatEventChannel:
public class UserChatEventChannel extends AsyncChannel{
@Override
protected void handle(Event message) {
UserChatEvent event = (UserChatEvent) message;
System.out.println(event.getMessage());
}
}
3.3 Chat User线程
我们定义完Event和接受Event的Channel后,现在定义一个代表聊天室参与者的User线程,代码如下:
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.ThreadLocalRandom.current;
public class UserChatThread extends Thread{
private final User user;
private final AsyncEventDispatcher dispatcher;
public UserChatThread(User user, AsyncEventDispatcher dispatcher) {
super(user.getName());
this.user = user;
this.dispatcher = dispatcher;
}
@Override
public void run() {
try {
// User上线,发送Online Event
dispatcher.dispatch(new UserOnlineEvent(user));
for(int i = 0; i < 5; i++) {
// 发送User的聊天信息
dispatcher.dispatch(new UserChatEvent(user, getName() + "-hello-" + i));
TimeUnit.SECONDS.sleep(current().nextInt(10));
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// User下线,发送Offline Event
dispatcher.dispatch(new UserOfflineEvent(user));
}
}
}
当User线程启动的时候,首先发送OnlineEvent,然后发送五条聊天信息,之后下线,在下线的时候发送OfflineEvent,下面写一个简单的程序测试一下:
public class UserChatApplication {
public static void main(String[] args) {
// 定义异步的Router
final AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();
// 为Router注册Channel和Event之间的关系
dispatcher.registerChannel(UserOnlineEvent.class, new UserOnlineEventChannel());
dispatcher.registerChannel(UserOfflineEvent.class, new UserOfflineEventChannel());
dispatcher.registerChannel(UserChatEvent.class, new UserChatEventChannel());
// 启动三个登录聊天室的User
new UserChatThread(new User("Leo"), dispatcher).start();
new UserChatThread(new User("Alex"), dispatcher).start();
new UserChatThread(new User("Tina"), dispatcher).start();
}
}