Vert.x MQTT Server

这个组件提供了一个服务器,它能处理远程MQTT客户端连接,通信和信息交换。 它的API提供了,当接受到客户端发送的raw protocol消息时相应的事件和提供一些发送信息到客户端的功能。 它不是一个功能齐全的MQTT broker,但可以用来建立类似的东西或者协议转换。

使用MQTT服务器

要使用Vert.x MQTT服务器,增加以下依赖到构建描述符中

  • Maven (in your pom.xml):
  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-mqtt-server</artifactId>
  4. <version>3.4.1</version>
  5. </dependency>
  • Gradle (in your build.gradle file):
  1. compile io.vertx:vertx-mqtt-server:3.4.1

开始

处理客户端连接/断开

这个例子展示了如何处理一个来自远程MQTT客户端的请求,首先,会创建一个Mqttserver实例和使用endpointHandler方法选定一个处理器用于处理远程客户端发送的CONNECT信息。 MqttEndpoint实例,会被当做Handler的参数,它携带了所有主要的与CONNECT消息相关联的信息,例如客户端标识符,用户名/密码,”will”信息,清除session标志,协议版本和保活超时。 在Handler内,endpoint实例提供accept方法以相应的CONNACK消息回应远程客户端;通过该方式,连接会被建立。最终,服务器通过listen方法以默认行为的行为(运行在localhost和默认MQTT端口1883)启动。存在一个相同的方法,允许选定一个Handler去检查是否服务器是否已经正常启动。

  1. MqttServer mqttServer = MqttServer.create(vertx);
  2. mqttServer.endpointHandler(endpoint -> {
  3. // 显示主要连接信息
  4. System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
  5. if (endpoint.auth() != null) {
  6. System.out.println("[username = " + endpoint.auth().userName() + ", password = " + endpoint.auth().password() + "]");
  7. }
  8. if (endpoint.will() != null) {
  9. System.out.println("[will topic = " + endpoint.will().willTopic() + " msg = " + endpoint.will().willMessage() +
  10. " QoS = " + endpoint.will().willQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  11. }
  12. System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
  13. // 接受远程客户端连接
  14. endpoint.accept(false);
  15. })
  16. .listen(ar -> {
  17. if (ar.succeeded()) {
  18. System.out.println("MQTT server is listening on port " + ar.result().actualPort());
  19. } else {
  20. System.out.println("Error on starting the server");
  21. ar.cause().printStackTrace();
  22. }
  23. });

endpoint实例提供disconnectHandler用于选定一个handler当远程客户端发送DISCONNECT消息会被调用,该handler没有参数。

  1. endpoint.disconnectHandler(v -> {
  2. System.out.println("Received disconnect from client");
  3. });

使用SSL / TLS支持处理客户端连接/断开连接

服务端支持通过SSL/TLS方式用来验证和加密客户端的连接请求。为了做到这一点,MqttServerOptions类提供了setSsl方法用来设置SSL/TLS的用法(传递true作为值)和一些其他提供了服务器验证和私钥(作为java键存储引用,PEM或PFX格式)。在以下例子,setKeyCertOptions方法被用来传递一个PEM格式的证书。该方法需要一个实现了KeyCertOptions接口的实例,在这种情况下PemKeyCertOptions类被用来提供提供服务器证书和对应setCertPathsetKeyPath方法的私钥路径。MQTT服务器通常以传递一个Vert.x实例启动和以上的MQTT选项实例作为创建方法的参数。

  1. MqttServerOptions options = new MqttServerOptions()
  2. .setPort(8883)
  3. .setKeyCertOptions(new PemKeyCertOptions()
  4. .setKeyPath("./src/test/resources/tls/server-key.pem")
  5. .setCertPath("./src/test/resources/tls/server-cert.pem"))
  6. .setSsl(true);
  7. MqttServer mqttServer = MqttServer.create(vertx, options);
  8. mqttServer.endpointHandler(endpoint -> {
  9. // 展示主要连接信息
  10. System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
  11. if (endpoint.auth() != null) {
  12. System.out.println("[username = " + endpoint.auth().userName() + ", password = " + endpoint.auth().password() + "]");
  13. }
  14. if (endpoint.will() != null) {
  15. System.out.println("[will topic = " + endpoint.will().willTopic() + " msg = " + endpoint.will().willMessage() +
  16. " QoS = " + endpoint.will().willQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  17. }
  18. System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
  19. // 接受远程客户端连接
  20. endpoint.accept(false);
  21. })
  22. .listen(ar -> {
  23. if (ar.succeeded()) {
  24. System.out.println("MQTT server is listening on port " + ar.result().actualPort());
  25. } else {
  26. System.out.println("Error on starting the server");
  27. ar.cause().printStackTrace();
  28. }
  29. });

所有其他与处理端点连接和断开相关在没有SSL/TLS支持下使用相同方式管理。

处理客户端订阅和退订请求

在客户端和服务端的连接被建立后,客户端可以以指定的主题发送订阅消息。MqttEndpoint接口允许使用subscribeHandler处理到来的订阅请求。 这样的Handler接受一个MqttSubscribeMessage接口的实例,which brings the list of topics with related QoS levels as desired by the client.最终,端点实例提供subscribeAcknowledge方法以包含了授予QoS级别的相关的SUBACK消息回应客户端。

  1. endpoint.subscribeHandler(subscribe -> {
  2. List<MqttQoS> grantedQosLevels = new ArrayList<>();
  3. for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
  4. System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
  5. grantedQosLevels.add(s.qualityOfService());
  6. }
  7. // 确认订阅请求
  8. endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
  9. });

以相同的方式,可以使用unsubscribeHandler方法在端点上选定一个handler,当客户端发送UNSUBSCRIBE消息时该handler会被调用。这个handler接受一个实现了MqttUnsubscribeMessage接口的实例作为参数,它携带了一个退订列表。最终,端点实例提供unsubscribeAcknowledge方法用于以相关的UNSUBACK消息回应客户。

  1. endpoint.unsubscribeHandler(unsubscribe -> {
  2. for (String t: unsubscribe.topics()) {
  3. System.out.println("Unsubscription for " + t);
  4. }
  5. // 确认订阅请求
  6. endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
  7. });

处理客户端发布消息

为了处理远程客户端发布的消息,MqttEndpoint接口提供了publishHandler方法用于选定一个handler,当客户端发送一个PUBLISH消息时会调用该handler。 这个handler接受一个MqttPublishMessage接口的实例作为参数,with the payload, the QoS level, the duplicate and retain flags.

假如QoS级别是0(最多一次),就没有必要给客户端响应。

假如QoS级别是1(至少一次),端点需要通过publishAcknowledge方法回应一个PUBACK消息

假如QoS级别是2(正好一次),端点需要使用publishReceived方法回应一个PUBREC消息。在这种情况下端点应该处理来自于客户端的PUBREL消息并且(当收到来资源端点的PUBREC消息,远程客户端发送就会发送它),可以通过publishReleaseHandler方法实现。为了关闭QoS级别2传递,端点可以使用publishComplete方法用来发送PUBCOMP消息到客户端。

  1. endpoint.publishHandler(message -> {
  2. System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");
  3. if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
  4. endpoint.publishAcknowledge(message.messageId());
  5. } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
  6. endpoint.publishRelease(message.messageId());
  7. }
  8. }).publishReleaseHandler(messageId -> {
  9. endpoint.publishComplete(messageId);
  10. });

发布消息到客户端

通过使用publish方法端点可以发布一个消息到远程客户端(发送一个PUBLISH消息),它使用以下入参,要发布的主题,负载,QoS级别,复制和保留标志。

假如QoS级别是0(最多一次),端点不会收到任何客户端的响应。

假如QoS级别是1(最多一次),端点需要去处理来自客户端的PUBACK消息,为了收到最后的确认消息。可以使用publishAcknowledgeHandler方法。

假如QoS级别是2(正好一次),端点需要去处理来自客户端的PUBREC消息。可以通过publishReceivedHandler方法来完成该操作。 在该Handler内,端点可以使用publishRelease方法响应PUBREL消息给客户端。最后一步是处理来自客户端的PUBCOMP消息;可以使用publishCompleteHandler来指定一个handler当收到PUBCOMP消息时候调用。

  1. // 例子, 发布一个QoS级别为2的消息
  2. endpoint.publish("my_topic", Buffer.buffer("Hello from the Vert.x MQTT server"), MqttQoS.EXACTLY_ONCE, false, false)
  3. // 选定handlers处理QoS 1与QoS 2
  4. endpoint.publishAcknowledgeHandler((messageId: java.lang.Integer) => {
  5. println(s"Received ack for message = ${messageId}")
  6. }).publishReceivedHandler((messageId: java.lang.Integer) => {
  7. endpoint.publishRelease(messageId)
  8. }).publishCompleteHandler((messageId: java.lang.Integer) => {
  9. println(s"Received ack for message = ${messageId}")
  10. })

被客户端保活通知

底层的MQTT保活机制是由服务器内部处理的。当接收到连接消息,服务器关注在该消息内的保活超时时间,用来检查客户端是否在该超市时间内没有发送任何消息。同时,对于接收到每个PINGREQ消息,服务器会以PINGRESP响应。

即便对于高等级应用不需要处理它,MqttEndpoint接口依然提供了pingHandler方法用来选定一个handler,当收到来自客户端的PINGREQ消息会被调用。 对于应用程序来说这只是一个通知,客户端并没有发送任何有意义的信息,只是一个用于检测保活的ping消息。在任何情况下,PINGRESP会被服务器自动发送。

关闭服务器

MqttServer接口提供了close方法用于关闭服务器。它停止监听到来的连接和关闭所有远程客户端活跃的连接。该方法是一个异步方法并且有个重载方法提供了选定一个完成handler当服务器真正关闭时进行调用。

  1. mqttServer.closeFuture().onComplete{
  2. case Success(result) => println("Success")
  3. case Failure(cause) => println("Failure")
  4. }

在verticles中自动清理

假如你是在verticles中创建的MQTT服务器,当verticle取消部署这些服务器会被自动关闭。

扩展:共享MQTT服务器

与MQTT服务器相关的handler总是在event loop线程中执行。这意味着在一个多核系统中,仅有一个实例被部署,一个核被使用。为了使用更多的核,可以部署更多的MQTT服务器实例 可以通过编程方式实现:

  1. for ( i <- 0 until 10) {
  2. var mqttServer = MqttServer.create(vertx)
  3. mqttServer.endpointHandler((endpoint: io.vertx.scala.mqtt.MqttEndpoint) => {
  4. // 处理端点
  5. }).listenFuture().onComplete{
  6. case Success(result) => println("Success")
  7. case Failure(cause) => println("Failure")
  8. }
  9. }

或者使用一个verticle指定实例的数量:

  1. var options = DeploymentOptions()
  2. .setInstances(10)
  3. vertx.deployVerticle("com.mycompany.MyVerticle", options)

真正是这样的,仅有一个MQTT服务器被部署,但到来的连接会被Vert.x使用轮转算法分发到不同的连接handlers上,在不同的核上执行。