The Event Bus

event bus是Vert.x的神经系统。

每一个Vertx对象内部都有一个唯一的event bus实例,我们可以通过eventBus这个方法获取它的引用。

event bus可以让你的应用程序的不同组件进行交互, 但是强大的是进行交互的组件可以自由选择实现语言,而且并不局限于仅仅只有在相同的Vertx实例内的组件才能交互。

event bus构成了一个在多个服务器节点和多个浏览器间的分布式端对端消息系统。

event bus还支持以下三种消息模式:publish/subscribe, point to point, request-response messaging

event busAPI是非常简单的,你基本只需要调用registering handlers, unregistering handlers 以及sending messages, publishing messages

The Theory

Addressing

我们通过event bus向一个地址发送Message.

在Vert.x中不需要担心是否会使用到复杂的寻址方案. 在Vert.x中,地址就是一个简单的合法字符串。Vert.x的地址还使用了一些scheme,例如使用.分割命名空间区间。

一些合法的地址例如:europe.news.feed1, acme.games.pacman, sausages, and X

Handlers

我们使用handlerevent bus中接收消息,因此你只需向一个address注册一个handler

handleraddress是一种多对多的关系,这意味着,一个handler可以向很多个address注册,同时多个handler可以向同一个address注册

Publish / subscribe messaging

event bus也支持publishing messages: 消息会被发布到某一个地址上.这意味着:某一消息会发布给在某个地址上注册的全部handler。这和publish/subscribe消息模式很像。

Point to point and Request-Response messaging

event bus支持点对点消息传送.

这种模式下消息会被发送到一个地址上。Vert.x然后会在该地址上的N个handler中选择一个,然后将消息传递给被选择的handler。

如果某个地址上注册了多个handler,Vert.x会根据non-strict round-robin算法来选取一个。

在点对点传送消息的情况中,当发送消息时,可以指定一个可选的回复handler。当接受者接受到一个消息后,同时该Message被处理后,接受者可以选择是否回应该消息。如果接受者选择回应该消息,那么reply handler会被调用。

当发送者接收到消息回应后,发送者还可以选择接着回应。这种模式可以永远重复下去,Vert.x还支持在这俩个verticle中创建一个会话。

这种通用的消息模式称为Request-Response模式。

Best-effort delivery

Vert.x会尽自己的全力进行消息分发,而且Vert.x保证不会主动抛弃消息,这种模式称为best-effort delivery.

然而,当event bus失效时,可能会发生消息丢失情况.如果你的应用程序不允许出现消息丢失,那么你应该将你的handler编码成idempotent(code your handlers to be idempotent),当event bus恢复正常后,你的消息发送者再次尝试发送消息.

Types of messages

Vert.x 消息支持所有的原生类型, String, Buffer. 但是在Vert.x中一般是使用JSON作为消息数据格式. 这是因为在Vert.x所支持的所有语言中,都很容易创建,读取和解析JSON

当然,Vert.x并不强制你必须使用JSON作为消息数据传输格式。

event bus本身是非常灵活的,而且支持发送任意的对象数据,只要你能进行编解码就可以

The Event Bus API

Let’s jump into the API

Getting the event bus

下例我们演示一下如何获得EventBus引用:

  1. EventBus eb = vertx.eventBus();

每一个Vertx实例中都有一个event bus实例。

Registering Handlers

下例演示了如何在event bus上注册一个handler

  1. EventBus eb = vertx.eventBus();
  2. eb.consumer("news.uk.sport", message -> {
  3. System.out.println("I have received a message: " + message.body());
  4. });

当你的handler收到一条message时, handler会自动被调用.

调用consumer(.., ..)方法的返回值是一个MessageConsumer实例.

我们可以通过MessageConsumer实例来unregister handler,也可以像流一样使用那个handler

或者你可以不向consumer方法中设置handler,那么你同样会获得一个MessageConsumer实例,你可以在MessageConsumer实例上再设置handler

  1. EventBus eb = vertx.eventBus();
  2. MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
  3. consumer.handler(message -> {
  4. System.out.println("I have received a message: " + message.body());
  5. });

当向一个集群的event bus上注册一个handler时,那么就需要向集群中的每一个节点上都要注册一个该handler,那这就需要消耗一些时间了。

如果你需要当向集群中所有的节点都注册完成时,捕获一个通知,那么你可以再在MessageConsumer上注册一个"completion" handler.

  1. consumer.completionHandler(res -> {
  2. if (res.succeeded()) {
  3. System.out.println("The handler registration has reached all nodes");
  4. } else {
  5. System.out.println("Registration failed!");
  6. }
  7. });

Un-registering Handlers

想要unregister一个handler只需要调用unregister方法就可以了

如果你当前的环境是一个集群环境, 那么就需要向整个集群中的所有节点都执行unregister操作,这同样需要一些时间等待,当然你也可以注册一个"completion" handler

  1. consumer.unregister(res -> {
  2. if (res.succeeded()) {
  3. System.out.println("The handler un-registration has reached all nodes");
  4. } else {
  5. System.out.println("Un-registration failed!");
  6. }
  7. });

Publishing messages

publish消息同样是非常简单的,你只需要向目标address上调用publish方法就可以了

  1. eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

这个消息会被分发到在目标地址上注册所有的handler上.

Sending messages

Sending出来的消息则只会在目的地址上注册的某个handler接受.这是一种point to point消息模式.handler的选择同样采用的是non-strict round-robin算法

下例演示了如何send message

  1. eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

Setting headers on messages

event bus上传送的消息同样可以带有消息头. 在sendingpublishing这俩种模式下,可以通过DeliveryOptions对象指定消息头

  1. DeliveryOptions options = new DeliveryOptions();
  2. options.addHeader("some-header", "some-value");
  3. eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);

The Message object

在消息handler上你接受的对象是一个Message实例

Message实例中的body就相当于被sent或者publish的对象.

我们还可以通过headers方法获得messageheader.

Replying to messages

有时候当你send出一个消息之后,你可能期待某些答复. 这种消息模式被称为request-response pattern

想要达到这种效果,你可以在send消息时设置一个reply handler.

当消息接收者收到消息后,可以通过调用消息上的reply方法进行应答

当接收者通过消息的reply方法进行应答时,那么发送者在send时设置的reply handler将会被调用,下面给出了这种应答模式的演示:

The receiver:

  1. MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
  2. consumer.handler(message -> {
  3. System.out.println("I have received a message: " + message.body());
  4. message.reply("how interesting!");
  5. });

The sender:

  1. eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
  2. if (ar.succeeded()) {
  3. System.out.println("Received reply: " + ar.result().body());
  4. }
  5. });

这种应答可以形成往复的应答模式从而生成一个会话

Sending with timeouts

send发送消息时,如果指定了一个reply handler,那么你还可以通过DeliveryOptions设置一个超时时间(默认是30s)。

当在指定的时间内没有收到对方应答时,reply handler将会以一种失败的状态被调用

Send Failures

在消息发送时可能会在下面几种情况下引发失败:

  • There are no handlers available to send the message to
  • The recipient has explicitly failed the message using fail

In all cases the reply handler will be called with the specific failure.

Message Codecs

如果你对在event bus上传送的对象指定一个消息编码器并且在event bus上注册了该消息编码器, 那么无论该对象是何类型,你都可以在event bus上对其进行传递.

当你sending或者publishing一个对象时, 你需要在DeliveryOptions对象里指定该对象所对应的编码器名称.

  1. eventBus.registerCodec(myCodec);
  2. DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());
  3. eventBus.send("orders", new MyPOJO(), options);

你也可以在eventBus上指定一个默认的编码器,这样一来,当你再send消息时,就不用每次都手动的设置编码器了

  1. eventBus.registerDefaultCodec(MyPOJO.class, myCodec);
  2. eventBus.send("orders", new MyPOJO());

如果你想要解除一个消息编码器,你只需要使用unregisterCodec就好了

Message codecs don’t always have to encode and decode as the same type. For example you can write a codec that allows a MyPOJO class to be sent, but when that message is sent to a handler it arrives as a MyOtherPOJO class.

Clustered Event Bus

event bus的作用域并不是单单的在一个单独的Vertx实例里。在集群里,你的局域网中的不同的Vertx实例可以聚合在一起,而每一个Vertx实例里的event bus可以相互聚集形成一个单独的分布式的event bus

Clustering programmatically

如果你通过编程的方式使用集群方法创建Vertx实例,在这种方式下你就得到了一个集群event bus

  1. VertxOptions options = new VertxOptions();
  2. Vertx.clusteredVertx(options, res -> {
  3. if (res.succeeded()) {
  4. Vertx vertx = res.result();
  5. EventBus eventBus = vertx.eventBus();
  6. System.out.println("We now have a clustered event bus: " + eventBus);
  7. } else {
  8. System.out.println("Failed: " + res.cause());
  9. }
  10. });

你必须确保你已经在classpath上实现了ClusterManager, 例如你也可以使用Vertx的ClusterManager实现

Clustering on the command line

你可以通过下面的方式进行命令行的集群配置

  1. vertx run MyVerticle -cluster

Automatic clean-up in verticles

If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregistered when the verticle is undeployed.

Examples

Codec

  1. class ClientCodec implements MessageCodec<ClientSource, ClientTarget> {
  2. /*
  3. * 当把对象s传输网络中时,该方法会被调用.
  4. * 会将s写入buffer中
  5. */
  6. @Override
  7. public void encodeToWire(Buffer buffer, ClientSource s) {
  8. }
  9. /*
  10. * pos表示从buffer哪里开始读
  11. */
  12. @Override
  13. public ClientTarget decodeFromWire(int pos, Buffer buffer) {
  14. return null;
  15. }
  16. /*
  17. * 如果message是在本地event bus上传递上传输时, 该方法会被调用, 将ClientSource类型对象改变为ClientTarget
  18. */
  19. @Override
  20. public ClientTarget transform(ClientSource s) {
  21. return null;
  22. }
  23. /*
  24. * 该编码器的名称, 每个编码器都必须有一个唯一的名字. 当发送message或者从event bus上解除编码器的时候,需要使用到该编码器
  25. */
  26. @Override
  27. public String name() {
  28. return null;
  29. }
  30. @Override
  31. public byte systemCodecID() {
  32. return -1;
  33. }
  34. }
  35. class ClientSource {
  36. }
  37. class ClientTarget {
  38. }