The Event Bus
event bus充当着Vert.x的”神经系统”
它允许vertivle能够相互通信,不管这些verticle是否是同一种语言实现,或者是否是在同一个Vert.x实例里。
It even allows client side JavaScript running in a browser to communicate on the same event bus. (More on that later).
它甚至允许运行在浏览器里的同一个event bus的JavaScript形式的verticle相互交互
event bus形成了一个横跨多个服务器节点以及多个浏览器的分布式的端对端的消息系统,
event bus的API是相当简单的. 它基本上只涉及了registering handlers, unregistering handlers 和 sending/publishing messages.
The Theory
Addressing
我们通过event bus向一个地址发送Message.
在Vert.x中不需要担心是否会使用到复杂的寻址方案. 在Vert.x中,地址就是一个简单的合法字符串。Vert.x的地址还使用了一些scheme,例如使用.分割命名空间区间。
一些合法的地址例如:europe.news.feed1, acme.games.pacman, sausages, and X
Handlers
我们使用handler从event bus中接收消息——向一个地址注册一个handler。
无论是否是同一个verticle中的handler都可以向相同的地址进行注册。verticle中的同一个handler也可以注册到不同的地址上
Publish / subscribe messaging
event bus也支持消息发布——消息会被发布到某一个地址上.消息发布意味着:将消息发布给在某个地址上注册的全部handler。这和publish/subscribe消息模式很像。
Point to point and Request-Response messaging
event bus支持点对点消息传送.消息会被发送到一个地址上。Vert.x然后会在该地址上的N个handler中选择一个,然后将消息传递给被选择的handler。如果某个地址上注册了多个handler,Vert.x会根据一个不是很严格的循环算法来选取一个。
在点对点传送消息的情况中,当发送消息时,可以指定一个可选的回复handler。当接受者接受到一个消息后,同时该Message被处理后,接受者可以选择是否回应该消息。如果接受者选择回应该消息,那么reply handler会被调用。
当发送者接收到消息回应后,发送者还可以选择接着回应。这种模式可以永远重复下去,Vert.x还支持在这俩个verticle中创建一个会话。这种通用的消息模式称为Request-Response模式。
Transient
event bus消息都具有瞬时性,当event bus全部或者部分失败后,那就有可能丢失一部分消息。如果你的应用程序不允许出现消息丢失,那么你应该将你的handler编码成idempotent,同时当event bus恢复后,你的sender再尝试回应消息。
如果你想要持久有你的消息,你可以使用persistent work queue module
Types of messages
在event bus上传递的消息可以是一个简单的字符串,一个数字,一个boolean,或者是Vert.x Buffer 或者JSON消息。
但是我们强烈建议你在不同的verticle中通过JSON消息进行通信。JSON可以在Vert.x支持的语言中轻松地创建和解析。
Event Bus API
Registering and Unregistering Handlers
下例展示了如何在test.address上注册一个消息handler。
EventBus eb = vertx.eventBus();Handler<Message> myHandler = new Handler<Message>() {public void handle(Message message) {System.out.println("I received a message " + message.body);}};eb.registerHandler("test.address", myHandler);
myHandler会接受到所有发送到test.address地址上的消息。
Message是一个泛型类,已经指定的消息类型有:Message<Boolean>, Message<Buffer>, Message<byte[]>, Message<Byte>, Message<Character>, Message<Double>, Message<Float>, Message<Integer>, Message<JsonObject>, Message<JsonArray>, Message<Long>, Message<Short> and Message<String>
如果你确定接受到的消息都是同一种类型,那么你可以在handler上使用指定类型
Handler<Message<String>> myHandler = new Handler<Message<String>>() {public void handle(Message<String> message) {String body = message.body;}};
registerHandler方法返回的是event bus自身。我们提供了一个流畅的API,因此你可以将多个调用连接在一起。
当你向某个地址中注册一个handler,同时处于一个集群中,那该注册过程就需要耗费一点时间来在整个集群中的进行传播。如果你想handler注册成功后获得通知,那么你可以向registerHandler方法的第三个参数中指定另一个handler。当集群中的所有节点都收到向某个地址注册handler信息之后,那么第三个参数handler就会被调用,然后你就会收到handler注册完成的通知了。
eb.registerHandler("test.address", myHandler, new AsyncResultHandler<Void>() {public void handle(AsyncResult<Void> asyncResult) {System.out.println("The handler has been registered across the cluster ok? " + asyncResult.succeeded());}});
解除handler注册也是非常简单的,你只需要向unregisterHandler方法传递注册地址和已经注册上的那个handler对象就可以了。
eb.unregisterHandler("test.address", myHandler);
一个handler可以向相同的或者不同的地址上注册多次,因此为了在handler解除注册时,能够确定handler的唯一性,在解除注册时你需要同时指定要被解除的handler对象和注册地址
和注册一样,当你在一个集群环境中解除handler注册,这个过程需要耗费一些时间,以便整个集群都会收到该解除注册通知。同样的你如果想要当解除注册完成之后获得通知,registerHandler给这个函数增加一个第三个参数就可以了
eb.unregisterHandler("test.address", myHandler, new AsyncResultHandler<Void>() {public void handle(AsyncResult<Void> asyncResult) {System.out.println("The handler has been unregistered across the cluster ok? " + asyncResult.succeeded());}});
如果你想要你的handler存在于整个verticle的生命周期内,那么你就没有必要显式地去对该handler进行解除注册,当verticle停止的时候,Vert.x会自动对其进行解除注册
Publishing messages
发布一个消息也是非常简单的,你只需要指定一个发布地址,然后在指定发布的内容就可以了
eb.publish("test.address", "hello world");
这个消息会发布给在该地址上注册的所有handler。
Sending messages
通过send发送消息,那么目标地址上只有一个handler进行消息接受。这是一种点对点的发送消息模式。选取handler同样采用了一种不是很严格的round-robin算法
eb.send("test.address", "hello world");
Replying to messages
当你接受到一个消息后,你可能需要对该消息进行回应,这种模式称为request-response
当你send一个消息时,你将一个回应handler作为第三个参数。当接受者接收到消息后,他们可以调用Message的reply方法来回应消息。当reply方法被调用的时候,它会将回复消息发送者。
Handler<Message<String>> myHandler = new Handler<Message<String>>() {public void handle(Message<String> message) {System.out.println("I received a message " + message.body);// Do some stuff// Now reply to itmessage.reply("This is a reply");}};eb.registerHandler("test.address", myHandler);
The sender:
eb.send("test.address", "This is a message", new Handler<Message<String>>() {public void handle(Message<String> message) {System.out.println("I received a reply " + message.body);}});
发送空的reply或者null reply都是合法的。
The replies themselves can also be replied to so you can create a dialog between two different verticles consisting of multiple rounds.
Specifying timeouts for replies
如果你在发送消息时指定了一个reply handler, 但是却一直得不到回复响应,那么那么该handler永远都不会被解除注册。
为了解决这个问题,你可以指定一个Handler<AsyncResult<Message>>作为reply handler,然后再设置一个超时时间。如果在超时之前,你收到了消息的reply,那么该AsyncResult的handler方法就会被调用。如果超时前一直都得不到reply,那么该handler就会自动被解除注册,同时new Handler<AsyncResult<Message<String>>>()也会被调用,但是AsyncResult会包含一个失败的状态,你可以在这种状态下做一些特殊处理:
eb.sendWithTimeout("test.address", "This is a message", 1000, new Handler<AsyncResult<Message<String>>>() {public void handle(AsyncResult<Message<String>> result) {if (result.succeeded()) {System.out.println("I received a reply " + message.body);} else {System.err.println("No reply was received before the 1 second timeout!");}}});
当send超时之后,我们可以通过AsyncResult的cause()来获得一个ReplyException异常信息。ReplyException上的failureType()值是ReplyFailure.TIMEOUT
你也可以在event bus自身上设置一个超时时间. 如果你在event bus使用带有reply handler的send(...)方法,那这个超时时间就会被使用到。默认的超时时间是-1,这意味着reply handler 永远不会超时
eb.setDefaultReplyTimeout(5000);eb.send("test.address", "This is a message", new Handler<Message<String>>() {public void handle(Message<String> message) {System.out.println("I received a reply before the timeout of 5 seconds");}});
同样,你也可以对reply设置一个超时,然后使用Handler<AsyncResult<Message>>在超时时间内获得reply的reply:
message.replyWithTimeout("This is a reply", 1000, new Handler<AsyncResult<Message<String>>>() {public void handle(AsyncResult<Message<String>> result) {if (result.succeeded()) {System.out.println("I received a reply to the reply" + message.body);} else {System.err.println("No reply to the reply was received before the 1 second timeout!");}}});
Getting notified of reply failures
如果你使用超时和一个result handler去send一个消息,但是没有可用的handler将消息发送出去,那么result handler将会被调用,AsyncResult会是一个失败的状态,同样cause()会返回一个ReplyException. ReplyException实例的failureType()的返回值是ReplyFailure.NO_HANDLERS
如果你使用超时和一个result handler去send一个消息,但是接受者通过调用Message.fail(..)回应该消息, result handler会被调用,AsyncResult会是一个失败的状态,同样cause()会返回一个ReplyException. ReplyException实例的failureType()的返回值是ReplyFailure.RECIPIENT_FAILURE
For example:
eb.registerHandler("test.address", new Handler<Message<String>>() {public void handle(Message<String> message) {message.fail(123, "Not enough aardvarks");}});eb.sendWithTimeout("test.address", "This is a message", 1000, new Handler<AsyncResult<Message<String>>>() {public void handle(AsyncResult<Message<String>> result) {if (result.succeeded()) {System.out.println("I received a reply " + message.body);} else {ReplyException ex = (ReplyException)result.cause();System.err.println("Failure type: " + ex.failureType();System.err.println("Failure code: " + ex.failureCode();System.err.println("Failure message: " + ex.message();}}});
Message types
你发送的消息类型可以是以下几种(包括部分包装类型)
booleanbyte[]bytechardoublefloatintlongshortjava.lang.Stringorg.vertx.java.core.json.JsonObjectorg.vertx.java.core.json.JsonArrayorg.vertx.java.core.buffer.Buffer
如果Vert.x buffers 和 JSON objects and arrays是在相同的JVM里进行传递,那么在传递之前,他们会被copy一份,因此不同的verticle不能访问相同的对象实例,相同的对象实例会引发条件竞争。
Send some numbers:
eb.send("test.address", 1234);eb.send("test.address", 3.14159);Send a boolean:eb.send("test.address", true);
Send a JSON object:
JsonObject obj = new JsonObject();obj.putString("foo", "wibble");eb.send("test.address", obj);
Null messages can also be sent:
eb.send("test.address", null);
使用JSON作为verticle通信协议是一个不错的约定,这是因为JSON可以被所有Vert.x所支持的语言进行编解码
Distributed event bus
如果想要在你的特定网络内每一个Vert.x实例都在相同的event bus里,你只需要在命令行里启动Vert.x实例时添加-cluster参数就好了
一旦你成功启动,集群模式下的Vert.x实例就会合并到一起,组成一个分布式的event bus
