1 Event-Driven Architecture基础

EDA(Event-Driven Architecture) 是一种实现组件之间松耦合、易扩展的架构方式, 在本节中, 我们先介绍EDA的基础组件, 让读者对EDA设计架构方式有一个基本的认识,一个最简单的EDA设计需要包含如下几个组件。

  • Events:需要被处理的数据。
  • Event Handlers:处理Events的方式方法。
  • Event Loop:维护Events和Event Handlers之间的交互流程。

如图所示, EventA将被HandlerA处理, 而EventB将被HandlerB处理,这一切的分配都是由EventLoop所控制的。
image.png

1.1 Events

Events是EDA中的重要角色, 一个Event至少需要包含两个属性:类型和数据, Event的类型决定了它会被哪个Handler处理, 数据是在Handler中代加工的材料, 下面写一个简单的程序,代码如所示。

  1. /*
  2. Event 只包含了该Event所属的类型和所包含的数据
  3. */
  4. public class Event {
  5. private final String type;
  6. private final String data;
  7. public Event(String type, String data) {
  8. this.type = type;
  9. this.data = data;
  10. }
  11. public String getType() {
  12. return type;
  13. }
  14. public String getData() {
  15. return data;
  16. }
  17. }

1.2 Event Handlers

EventHandlers主要用于处理Event,比如一些filtering或者transforming数据的操作等,下面我们写两个比较简单的方法,代码如下:

  1. // 用于处理A类型的Event
  2. public static void handleEventA(Event e) {
  3. System.out.println(e.getData().toLowerCase());
  4. }
  1. // 用于处理B类型的Event
  2. public static void hanleEventB(Event e) {
  3. System.out.println(e.getData().toUpperCase());
  4. }

1.3 Event Loop

Event Loop处理接收到的所有Event, 并且将它们分配给合适的Handler去处理, 代码如下:

  1. Event e;
  2. while( !events.isEmpty()) {
  3. // 从消息队列中不断移除,根据不同的类型进行处理
  4. e = events.remove();
  5. switch (e.getType()) {
  6. case "A":
  7. handleEventA(e);
  8. break;
  9. case "B":
  10. hanleEventB(e);
  11. break;
  12. }
  13. }

完整代码:

  1. import java.util.LinkedList;
  2. import java.util.Queue;
  3. public class FooEventDrivenExample {
  4. // 用于处理A类型的Event
  5. public static void handleEventA(Event e) {
  6. System.out.println(e.getData().toLowerCase());
  7. }
  8. // 用于处理B类型的Event
  9. public static void hanleEventB(Event e) {
  10. System.out.println(e.getData().toUpperCase());
  11. }
  12. public static void main(String[] args) {
  13. Queue<Event> events = new LinkedList<>();
  14. events.add(new Event("A", "Hello"));
  15. events.add(new Event("B", "I am Event B"));
  16. events.add(new Event("A", "I am Event A"));
  17. Event e;
  18. while( !events.isEmpty()) {
  19. // 从消息队列中不断移除,根据不同的类型进行处理
  20. e = events.remove();
  21. switch (e.getType()) {
  22. case "A":
  23. handleEventA(e);
  24. break;
  25. case "B":
  26. hanleEventB(e);
  27. break;
  28. }
  29. }
  30. }
  31. }

2 开发一个Event-Driven框架

通过1节的基础知识介绍,我们大致可以知道,一个基于事件驱动的架构设计,总体来讲会涉及如下几个重要组件:事件消息(Event) 、针对该事件的具体处理器(Handler) 、接受事件消息的通道(29.1.3节中的queue) , 以及对事件消息如何进行分配(Event Loop) 。

2.1 同步EDA框架设计

高度抽象的同步EDA框架

(1) Message

回顾1节基础部分的介绍, 在基于Message的系统中, 每一个Event也可以被称为Message, Message是对Event更高一个层级的抽象, 每一个Message都有一个特定的Type用于与对应的Handler做关联, 3是Message接口的定义。

  1. public interface Message {
  2. /*
  3. 返回Message的类型
  4. */
  5. Class<? extends Message> getType();
  6. }

(2) Channel

第二个比较重要的概念就是Channels,Channel主要用于接受来自EventLoop分配的消息,每一个Channel负责处理一种类型的消息(当然这取决于你对消息如何进行分配) ,4是Channel接口的定义:

  1. public interface Channel <E extends Message>{
  2. /**
  3. * dispatch方法用于负责Message的调度
  4. */
  5. void dispatch(E message);
  6. }

(3) Dynamic Router

Router的作用类似于1节中的EventLoop, 其主要是帮助Event找到合适的Channel并且传送给它, Dynamic Routers代码定义如5所示。

  1. public interface DynamicRouter<E extends Message> {
  2. /**
  3. * 针对每一种Message类型注册相关的Channel,只有找到合适的channel该Message才会被处理
  4. * @param messageType
  5. * @param channel
  6. */
  7. void registerChannel(Class<? extends E> messageType, Channel<? extends E> channel);
  8. /**
  9. * 为相应的Channel分配Message
  10. * @param message
  11. */
  12. void dispatch(E message);
  13. }

Router如何知道要将Message分配给哪个Channel呢?换句话说,Router需要了解到Channel的存在,因此registerChannel() 方法的作用就是将相应的Channel注册给Router,dispatch方法则是根据Message的类型进行路由匹配。

(4) Event

Event是对Message的一个最简单的实现, 在以后的使用中, 将Event直接作为其他Message的基类即可(这种做法有点类似于适配器模式) , Event接口的定义如6所示。

  1. public class Event implements Message{
  2. @Override
  3. public Class<? extends Message> getType() {
  4. return getClass();
  5. }
  6. }

(5) EventDispatcher

EventDispatcher是对DynamicRouter的一个最基本的实现,适合在单线程的情况下进行使用,因此不需要考虑线程安全的问题。EventDispatcher接口的定义如7所示。

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. /*
  4. EventDispatcher不是一个线程安全的类
  5. */
  6. public class EventDispatcher implements DynamicRouter<Message> {
  7. // 用于保存Channel和Message之间的关系
  8. private final Map<Class<? extends Message>, Channel> routerTable;
  9. public EventDispatcher(){
  10. // 初始化RouteTable,但是在该实现中,我们使用HashMap作为路由表
  11. this.routerTable = new HashMap<>();
  12. }
  13. @Override
  14. public void registerChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {
  15. this.routerTable.put(messageType, channel);
  16. }
  17. @Override
  18. public void dispatch(Message message) {
  19. if ( routerTable.containsKey(message.getType())) {
  20. // 直接获取对应的Channel处理的Message
  21. routerTable.get(message.getType()).dispatch(message);
  22. } else {
  23. throw new MessageMatcherException("Can't match the channel for ");
  24. }
  25. }
  26. }

在EventDispatcher中有一个注册表routerTable, 主要用于存放不同类型Message对应的Channel, 如果没有与Message相对应的Channel, 则会抛出无法匹配的异常, 示例代码如8所示。

  1. public class MessageMatcherException extends RuntimeException{
  2. public MessageMatcherException(String message) {
  3. super(message);
  4. }
  5. }

简单的测试用例

  1. public class EventDispatcherExample {
  2. /**
  3. * InputEvent中定义了两个属性X和Y,主要用于在其他Channel中的运算
  4. */
  5. static class InputEvent extends Event {
  6. private final int x;
  7. private final int y;
  8. InputEvent(int x, int y) {
  9. this.x = x;
  10. this.y = y;
  11. }
  12. public int getX() {
  13. return x;
  14. }
  15. public int getY() {
  16. return y;
  17. }
  18. }
  19. /**
  20. * 用于存放结果的Event
  21. */
  22. static class ResultEvent extends Event {
  23. private final int result;
  24. public ResultEvent(int result) {
  25. this.result = result;
  26. }
  27. public int getResult() {
  28. return result;
  29. }
  30. }
  31. /**
  32. * 处理ResultEvent的Handler(Channel),只是简单地将计算结果输出到控制台
  33. */
  34. static class ResultEventHandler implements Channel<ResultEvent> {
  35. @Override
  36. public void dispatch(ResultEvent message) {
  37. System.out.println("The reuslt is :" + message.getResult());
  38. }
  39. }
  40. /**
  41. * InputEventHandler需要向Router发送Event,因此在构造的时候需要传入Dispatcher
  42. */
  43. static class InputEventHandler implements Channel<InputEvent> {
  44. private final EventDispatcher dispatcher;
  45. InputEventHandler(EventDispatcher dispatcher) {
  46. this.dispatcher = dispatcher;
  47. }
  48. /**
  49. * 将计算的结果构造成新的Event提交给Router
  50. * @param message
  51. */
  52. @Override
  53. public void dispatch(InputEvent message) {
  54. System.out.println("test");
  55. int result = message.getX() + message.getY();
  56. dispatcher.dispatch(new ResultEvent(result));
  57. }
  58. }
  59. public static void main(String[] args) {
  60. // 构造Router
  61. EventDispatcher dispatcher = new EventDispatcher();
  62. // 将Event 和 Handler(Channel)的绑定关系注册到Dispatcher
  63. dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
  64. dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
  65. dispatcher.dispatch(new InputEvent(1,2));
  66. }
  67. }

由于所有的类都存放于一个文件中,因此看起来测试代码比较多,其实结构还是非常清晰的, InputEvent是一个Message,它包含了两个Int类型的属性,而InputEventHandler是对InputEvent消息的处理,接收到了InputEvent消息之后,分别对X和Y进行相加操作,然后将结果封装成ResultEvent提交给EventDispatcher,ResultEvent相对比较简单,只包含了计算结果的属性,ResultEventHandler则将计算结果输出到控制台上。

通过上面这个例子的运行你会发现,不同数据的处理过程之间根本无须知道彼此的存在, 一切都由Event Dispatcher这个Router来控制, 它会给你想要的一切, 这是一种稀疏耦合(松耦合) 的设计。图29-2所示的为同步EDA架构类图。

image.png

2.2 异步EDA框架设计

在2.1节中,我们实现了一个基本的EDA框架,但是这个框架在应对高并发的情况下还是存在一些问题的,具体如下。

  • Event Dispatcher不是线程安全的类, 在多线程的情况下,registerChannel方法会引起数据不一致的问题。
  • 就目前言,我们实现的所有Channel都无法并发消费Message,比如 InputEventHandler只能逐个处理Message,低延迟的消息处理还会导致Dispatcher出现积压。

在本节中, 我们将对2.1节中的EDA框架进行扩充,使其可支持并发任务的执行,下面定义了一个新的AsyncChannel作为基类, 该类中提供了Message的并发处理能力, 代码如10所示。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public abstract class AsyncChannel implements Channel<Event>{
  4. // 在AsyncChannel 中将使用ExecutorServcie多线程的方式提交给Message
  5. private final ExecutorService executorService;
  6. // 默认构造函数,提交了CPU的核数*2的线程数量
  7. public AsyncChannel() {
  8. this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
  9. }
  10. // 用户自定义的ExecutorService
  11. public AsyncChannel(ExecutorService executorService) {
  12. this.executorService = executorService;
  13. }
  14. // 重写dispatch方法,并且用final修饰,避免子类重写
  15. @Override
  16. public void dispatch(Event message) {
  17. executorService.submit(() -> this.handle(message));
  18. }
  19. // 提供抽象方法,供子类实现具体的Message处理
  20. protected abstract void handle(Event message);
  21. // 提供关闭ExecutorService的方法
  22. void stop() {
  23. if ( null != executorService && !executorService.isShutdown())
  24. executorService.shutdown();
  25. }
  26. }

其次,还需要提供新的EventDispatcher类AsyncEventDispatcher负责以并发的方式dispatchMessage, 其中Event对应的Channel只能是Async Channel类型,并且也对外暴露了 shutdown方法, 代码如所示。

  1. import java.util.Map;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. public class AsyncEventDispatcher implements DynamicRouter<Event>{
  4. private final Map<Class<? extends Event>, AsyncChannel> routerTable;
  5. public AsyncEventDispatcher() {
  6. this.routerTable = new ConcurrentHashMap<>();
  7. }
  8. @Override
  9. public void registerChannel(Class<? extends Event> messageType, Channel<? extends Event> channel) {
  10. // 在AsyncEventDispatcher中,channel必须是AsyncChannel类型
  11. if ( !(channel instanceof AsyncChannel )) {
  12. throw new IllegalArgumentException("The channel must be ");
  13. }
  14. this.routerTable.put(messageType, (AsyncChannel) channel);
  15. }
  16. @Override
  17. public void dispatch(Event message) {
  18. if ( routerTable.containsKey(message.getType())) {
  19. routerTable.get(message.getType()).dispatch(message);
  20. } else {
  21. throw new MessageMatcherException("");
  22. }
  23. }
  24. public void shutdown() {
  25. // 关闭所有的Channel以释放资源
  26. routerTable.values().forEach(AsyncChannel::stop);
  27. }
  28. }

在AsyncEventDispatcher中,routerTable使用线程安全的Map定义,在注册Channel的时候,如果其不是AsyncChannel的类型,则会抛出异常。
异步EDA架构类图
image.png

  1. import java.util.concurrent.TimeUnit;
  2. public class AsyncEventDispatcherExample {
  3. // 主要用于处理InputEvent, 但是需要继承AsyncChannel
  4. static class AsyncInputEventHandler extends AsyncChannel {
  5. private final AsyncEventDispatcher dispatcher;
  6. AsyncInputEventHandler(AsyncEventDispatcher dispatcher) {
  7. this.dispatcher = dispatcher;
  8. }
  9. // 不同于以同步的方式实现dispatch,异步的方式需要实现handle
  10. @Override
  11. protected void handle(Event message) {
  12. EventDispatcherExample.InputEvent inputEvent =
  13. (EventDispatcherExample.InputEvent)message;
  14. System.out.println("test");
  15. try {
  16. TimeUnit.SECONDS.sleep(5);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. int result = inputEvent.getX() + inputEvent.getY();
  21. dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
  22. }
  23. }
  24. // 主要用于处理InputEvent,但是需要继承AsyncChannel
  25. static class AsyncResultEventHandler extends AsyncChannel {
  26. @Override
  27. protected void handle(Event message) {
  28. EventDispatcherExample.ResultEvent resultEvent =
  29. (EventDispatcherExample.ResultEvent) message;
  30. try {
  31. TimeUnit.SECONDS.sleep(5);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. System.out.println("the result is :" + resultEvent.getResult());
  36. }
  37. }
  38. public static void main(String[] args) {
  39. // 定义AsyncEventDispatcher
  40. AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();
  41. // 注册
  42. dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
  43. dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
  44. // 提交需要处理的Message
  45. dispatcher.dispatch(new EventDispatcherExample.InputEvent(1,2));
  46. }
  47. }

当dispatcher分配一个Event的时候,如果执行非常缓慢也不会影响下一个Event被dispatch,这主要得益于我们采用了异步的处理方式(ExecutorService本身存在的任务队列可以允许异步提交一定数量级的数据)。

3 Event-Driven的使用

在本节中,我们模拟一个简单的聊天应用程序,借助于我们在2节开发的EDA小框架,首先我们要为聊天应用程序定义如下几个类型的Event。

  • User Online Event: 当用户上线时来到聊天室的Event。
  • User Offline Event: 当用户下线时退出聊天室的Event。
  • User Chat Event:用户在聊天室中发送聊天信息的Event。

3.1 Chat Event

首先, 我们定义一个User对象, 代表聊天室的参与者, 比较简单就是一个名字, 代码如下:

  1. public class User {
  2. private final String name;
  3. public User(String name) {
  4. this.name = name;
  5. }
  6. public String getName() {
  7. return name;
  8. }
  9. }

下面定义一个UserOnlineEvent,代表用户上线的Event,代码如下:

  1. public class UserOnlineEvent extends Event{
  2. private final User user;
  3. public UserOnlineEvent(User user) {
  4. this.user = user;
  5. }
  6. public User getUser() {
  7. return user;
  8. }
  9. }

下面定义一个UserOfflineEvent,代表用户下线的Event,代码如下:

  1. public class UserOfflineEvent extends UserOnlineEvent{
  2. public UserOfflineEvent(User user) {
  3. super(user);
  4. }
  5. }

下面定义一个UserChatEvent,代表用户发送了聊天信息的Event,代码如下:

  1. public class UserChatEvent extends UserOnlineEvent{
  2. // ChatEvent需要有聊天的信息
  3. private final String message;
  4. public UserChatEvent(User user, String message) {
  5. super(user);
  6. this.message = message;
  7. }
  8. public String getMessage() {
  9. return message;
  10. }
  11. }

UserChatEvent比其他两个Event多了代表聊天内容的message属性。

3.2 Chat Channel(Handler)

所有的Handler都非常简单, 只是将接收到的信息输出到控制台,由于是在多线程的环境下运行, 因此我们需要继承AsyncChannel。
下面定义一个UserOnlineEventChannel, 主要用于处理UserOnlineEvent事件, 代码如下:

  1. // 用户上线的Event,简单输出用户上线即可
  2. public class UserOnlineEventChannel extends AsyncChannel{
  3. @Override
  4. protected void handle(Event message) {
  5. UserOnlineEvent event = (UserOnlineEvent) message;
  6. System.out.println("The User ");
  7. }
  8. }

UserOfflineEventChannel代码:

  1. public class UserOfflineEventChannel extends AsyncChannel{
  2. @Override
  3. protected void handle(Event message) {
  4. UserOfflineEvent event = (UserOfflineEvent) message;
  5. System.out.println("offline");
  6. }
  7. }

UserChatEventChannel:

  1. public class UserChatEventChannel extends AsyncChannel{
  2. @Override
  3. protected void handle(Event message) {
  4. UserChatEvent event = (UserChatEvent) message;
  5. System.out.println(event.getMessage());
  6. }
  7. }

3.3 Chat User线程

我们定义完Event和接受Event的Channel后,现在定义一个代表聊天室参与者的User线程,代码如下:

  1. import java.util.concurrent.TimeUnit;
  2. import static java.util.concurrent.ThreadLocalRandom.current;
  3. public class UserChatThread extends Thread{
  4. private final User user;
  5. private final AsyncEventDispatcher dispatcher;
  6. public UserChatThread(User user, AsyncEventDispatcher dispatcher) {
  7. super(user.getName());
  8. this.user = user;
  9. this.dispatcher = dispatcher;
  10. }
  11. @Override
  12. public void run() {
  13. try {
  14. // User上线,发送Online Event
  15. dispatcher.dispatch(new UserOnlineEvent(user));
  16. for(int i = 0; i < 5; i++) {
  17. // 发送User的聊天信息
  18. dispatcher.dispatch(new UserChatEvent(user, getName() + "-hello-" + i));
  19. TimeUnit.SECONDS.sleep(current().nextInt(10));
  20. }
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. } finally {
  24. // User下线,发送Offline Event
  25. dispatcher.dispatch(new UserOfflineEvent(user));
  26. }
  27. }
  28. }

当User线程启动的时候,首先发送OnlineEvent,然后发送五条聊天信息,之后下线,在下线的时候发送OfflineEvent,下面写一个简单的程序测试一下:

  1. public class UserChatApplication {
  2. public static void main(String[] args) {
  3. // 定义异步的Router
  4. final AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();
  5. // 为Router注册Channel和Event之间的关系
  6. dispatcher.registerChannel(UserOnlineEvent.class, new UserOnlineEventChannel());
  7. dispatcher.registerChannel(UserOfflineEvent.class, new UserOfflineEventChannel());
  8. dispatcher.registerChannel(UserChatEvent.class, new UserChatEventChannel());
  9. // 启动三个登录聊天室的User
  10. new UserChatThread(new User("Leo"), dispatcher).start();
  11. new UserChatThread(new User("Alex"), dispatcher).start();
  12. new UserChatThread(new User("Tina"), dispatcher).start();
  13. }
  14. }