关于事件驱动和事件解耦的设计哲学,目前阐述的文章很少,我们姑且引用 《领域驱动设计》 出版后对领域事件的定义。

“当……”
“如果发生……,则……”
“当做完……的时候,请通知……”

在这些场景中,如果发生某种事件后,会触发进一步的操作,那么这个事件很可能就是领域事件。

有时从领域专家话中,我们好像也还是看不出哪里有领域事件,但业务需求依然可能需要领域事件。领域专家有时可能意识不到这些需求,只有在经过跨团队讨论后才意识到这些。之所以会这样,是由于领域事件需要发布到外部系统,比如发布到另一个限界上下文。由于这样的事件由订阅方处理,它将对本地和远程上下文都产生影响。

由于领域事件需要发布到外部系统,比如发布到另一个限界上下文。由于这样的事件由订阅方处理,它将对本地和远程上下文产生深远的影响。

一个领域事件将导致进一步的业务操作,在实现业务解耦的同时,还有助于形成完整的业务闭环。

领域事件可以是业务流程的一个步骤,比如一个事件发生后触发的后续动作,比如密码连续输错三次,触发锁定账户的动作。

那领域事件为什么要用最终一致性,而不是传统 SOA 的直接调用?

聚合的一个原则:一个事务中最多只能更改一个聚合实例。所以

  • 本地限界上下文中的其他聚合实例便可以通过领域事件的方式同步
  • 用于使远程依赖系统与本地系统保持一致。解耦本地系统和远程系统还有助于提高双方协作服务的可伸缩性。

事件发布完成后,发布方不必关心后续订阅方事件处理是否成功,即可实现领域模型的解耦,维护领域模型的独立性和数据一致性。在领域模型映射到微服务架构时,领域事件可解耦微服务,微服务间的数据不必要求强一致性,而是基于事件的最终一致性。

而 EventBus 特别适合用于实现单机版的领域事件解耦。

在目前的分布式情况下,我们一般会使用更专业的 MQ。但是很多时候有一些事件并不需要复杂的使用中间件。比如埋点上报,埋点上报对于系统来说是一个可以接受丢失的,那么单机版的 EventBus 就特别适合降低耦合度。

Cloudopt Next 是基于 Vertx 的,所以也支持 Vertx 的 EventBus。

Vert.x 应用程序是事件驱动,异步和单线程的。 Vert.x 过程通过事件总线,这是 Vert.x 的事件驱动架构的内置一块通信。 结合异步处理,单线程组件和事件总线产生高度的可扩展性,并编写单线程应用对习惯于多线程并发 Java 的人来说是一种解脱。 可以说,Vert.x 的最好的部分是其模块化的基于 JVM 的架构。 Vert.x 应用程序可以运行在几乎所有的操作系统,并且可以使用任何支持的 JVM 兼容的编程语言来编写。 一个 Vert.x 应用可完全在单一语言编写,也可以是用不同的编程语言模块的跨界混搭。 Vert.x 模块集成了 Vert.x 事件总线上。

每个 Verticle 的运行都是在 Vertx.x 的实例单线程中,通过 Event Loop 进行调度,而 Verticle 之间的相互调用和数据传递都是通过 EventBus 进行的。

你需要先在 maven 中先加载这个插件:

  1. <dependency>
  2. <groupId>net.cloudopt.next</groupId>
  3. <artifactId>cloudopt-next-eventbus</artifactId>
  4. <version>${version}</version>
  5. </dependency>

EventListener

在 Cloudopt Next 中使用仅需要实现 EventListener 并添加 @AutoEvent 注解,订阅指定事件。发布消息则是通过 EventManager.send() 或者 EventManager.publish()。如果使用 publish() ,所有订阅了该主题的监听器都会执行。

  1. fun main(args: Array<String>) {
  2. CloudoptServer.addPlugin(EventPlugin())
  3. CloudoptServer.run(TestCase::class)
  4. }
  1. @AutoEvent("net.cloudopt.web.test")
  2. class TestEventListener:EventListener {
  3. override fun listener(message: Message<Any>) {
  4. print(message.body())
  5. }
  6. }

Send Message

  1. EventBusManager.send(address = address, message = message)
  2. EventBusManager.public(address = address, message = message)

使用 RedisMQ

3.0.0.0 正式版开始支持使用 RedisMQ 来替代 vert.x 的 eventbus。你需要额外引入 next 的 redis 插件并且在 eventbus 插件启动前修改名称为 default 的实现类。

RedisMQ 不支持 send,只允许使用 public。

  1. EventBusManager.addProvider("default", RedisMQProvider())

当然你也可以随意增加或删除某个名词的实现类,发送消息时也只需要通过 providerName 即可修改实现类。

  1. EventBusManager.addProvider("redis", RedisMQProvider())
  2. EventBusManager.public(address = address, message = message, providerName = "redis")

After Event

其实我们在日常开发中经常会碰到调用完请求需要做些什么事情的场景,如新用户注册后需要给他发放优惠卷,在传统的开发模式下发放优惠券的代码是放在注册的代码后面。但是如果要再增加记录用户登录时间等功能的话,这个路由方法中的代码将会非常的多。

为了帮助大家在实际开发中进行解耦,我们提供了 @afterEvent 注解。Cloudopt Next 会自动在请求准备结束时调用 EventBus 的 send 方法进行发送,那么只需要让消费者订阅相应的事件就可以解耦了。

在使用这个注解前请先加载 EventPlugin

2.0.7.0 后会自动将上下文对象 resource.context.data 作为message 的body 发送到消费者那里去。如果你有需要传递的参数请放入 resource.context.data 中。

  1. @GET("afterEvent")
  2. @AfterEvent(["net.cloudopt.web.test"])
  3. fun afterEvent() {
  4. this.context.data().put("key","value")
  5. renderText("AfterEvent is success!")
  6. }
  1. @AutoEvent("net.cloudopt.web.test")
  2. class TestEventListener:EventListener {
  3. override fun listener(message: Message<Any>) {
  4. print(message.body())
  5. }
  6. }