Connecting to a Broker

STOMP 代理中转站保持着一个与代理的单一 「系统」TCP 连接。这个连接只用于从服务器端应用程序发出的消息,不用于接收消息。你可以为这个连接配置 STOMP 凭证(即 STOMP 框架的登录和密码 header)。这在 XML 命名空间和 Java 配置中都暴露为 systemLogin 和 systemPasscode 属性,默认值为 guest 和 guest。

STOMP 代理中继还为每个连接的 WebSocket 客户端创建一个单独的 TCP 连接。你可以配置 STOMP 证书,这些证书用于代表客户创建的所有 TCP 连接。这在 XML 命名空间和 Java 配置中被暴露为 clientLogin 和 clientPasscode 属性,默认值为 guest 和 guest。

:::info STOMP 代理中继总是在它代表客户转发给代理的每个 CONNECT 帧上设置登录和密码 header。因此,WebSocket 客户端不需要设置这些头信息。它们会被忽略。正如 认证 部分所解释的,WebSocket 客户端应依靠 HTTP 认证来保护 WebSocket 端点并建立客户端身份。 :::

STOMP 代理中继也通过 「系统 system」TCP 连接向消息代理发送和接收心跳。你可以配置发送和接收心跳的时间间隔(默认为 10 秒)。如果与经纪人的连接丢失,经纪人中继会继续尝试重新连接,每 5 秒一次,直到成功。

任何 Spring Bean 都可以实现 ApplicationListener<BrokerAvailabilityEvent>来接收与经纪人的 「系统」连接丢失和重新建立时的通知。例如,当没有活跃的 「系统 system」连接时,一个广播股票报价的股票报价服务可以停止尝试发送消息。

默认情况下,STOMP 代理中继器总是连接到同一主机和端口,并在失去连接时根据需要重新连接。如果你希望提供多个地址,在每次尝试连接时,你可以配置一个地址供应商,而不是一个固定的主机和端口。下面的例子显示了如何做到这一点:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
  4. // ...
  5. @Override
  6. public void configureMessageBroker(MessageBrokerRegistry registry) {
  7. registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
  8. registry.setApplicationDestinationPrefixes("/app");
  9. }
  10. private ReactorNettyTcpClient<byte[]> createTcpClient() {
  11. return new ReactorNettyTcpClient<>(
  12. client -> client.addressSupplier(() -> ... ),
  13. new StompReactorNettyCodec());
  14. }
  15. }

你也可以用 virtualHost 属性来配置 STOMP 代理中继。这个属性的值被设置为每个 CONNECT 帧的主机头,可能很有用(例如,在云环境中,建立 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机不同)。

设置 RabbitMQ 为消息代理的例子

在前面的 例子中改造

开启 RabbitMQ Stomp 插件

RabbitMQ 中默认已经包含了 rabbitmq_stomp,只需要通过以下命令开启即可

  1. rabbitmq-plugins enable rabbitmq_stomp

注意需要对外暴露 61613 端口,比如我这里使用的是 docker 测试的 RabbitMQ,就需要暴露 61613 端口,否则在程序启动的时候就会报如下的错

  1. TCP connection failure in session _system_: Failed to connect: Connection refused: no further information: /127.0.0.1:61613

另外还需要注意的是,使用命令 enable rabbitmq_stomp 之后,还需要重启 RabbitMQ,否则就会出现下面这样的提示
image.png
链接的时候会提示 Broker not available

服务端如何配置链接到 RabbitMQ

其他的不需要修改,只需要如本文中说的一样,在 configureMessageBroker 中配置 broker 的链接信息即可

不过为了能使用 enableStompBrokerRelay 中默认使用的 netty tcp 链接还需要添加两个依赖

  1. // 用于 TCP 连接管理: 链接到 RabbitMQ STOMP
  2. implementation 'io.projectreactor.netty:reactor-netty:1.0.14'
  3. implementation 'io.netty:netty-all:4.1.72.Final'
  1. package cn.mrcode.study.springdocsread.websocket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  5. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  6. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  7. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  8. import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
  9. /**
  10. * @author mrcode
  11. */
  12. @Configuration
  13. @EnableWebSocketMessageBroker
  14. public class MyWebSocketConfig implements WebSocketMessageBrokerConfigurer {
  15. @Override
  16. public void registerStompEndpoints(StompEndpointRegistry registry) {
  17. // portfolio 是 WebSocket(或 SockJS)的端点的 HTTP URL。客户端需要连接以进行 WebSocket 握手。
  18. registry.addEndpoint("/portfolio")
  19. .setAllowedOriginPatterns("*")
  20. .withSockJS();
  21. }
  22. @Override
  23. public void configureMessageBroker(MessageBrokerRegistry config) {
  24. // 目标标头以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法。
  25. config.setApplicationDestinationPrefixes("/app");
  26. // 使用内置的消息代理进行订阅和广播,并且 将目标标头以 /topic 或 /queue 开头的消息路由到代理。
  27. config.enableStompBrokerRelay("/topic", "/queue")
  28. // 默认使用 ReactorNettyTcpClient
  29. // .setTcpClient()
  30. // 所以可以直接设置相关的属性
  31. .setRelayHost("127.0.0.1")
  32. .setRelayPort(61613)
  33. ;
  34. }
  35. /**
  36. * spring 为了支持每种容器自己的 websocket 升级策略,抽象了 RequestUpgradeStrategy,
  37. * <p>对 tomcat 提供了 TomcatRequestUpgradeStrategy 策略</p>
  38. * 如果不申明这个,就会在启动的时候抛出异常:No suitable default RequestUpgradeStrategy found
  39. */
  40. @Bean
  41. public TomcatRequestUpgradeStrategy tomcatRequestUpgradeStrategy() {
  42. return new TomcatRequestUpgradeStrategy();
  43. }
  44. }

启动前端 html 测试

在控制台中功能正常的情况下会显示如下信息
image.png

在 RabbitMQ 中会多出一个临时队列
image.png
你可以开启其他的浏览器,然后打开测试的 HTML 例子,订阅成功后,就会多出来一个临时队列了

为什么要有一个外部的消息代理?

前面这个例子如果你将后端服务的端口修改下,启动两个服务。 然后把 html 也复制一份,每一份链接后端服务的一个。 分别打开他们,就你明白了。

在两个不同端口上的 STOMP 订阅者,能共享消息。
image.png
如上图所示 8080 上的 html 打开了。订阅了 /topic/greeting ,端口 8081 上的 html 也打开了,同样订阅了相同的主题。那么当 8080 的先打开后,你再打开 8081 的 html,就会发现 8080 上还会收到 /topic/greeting 发来的消息。
image.png
可以看到上图,右侧的是后打开的页面,左侧和右侧收到的时间戳是一致的。也就是说他们在一个集群中。

这就是集群版的 STMP 哇