STOMP 代理中转站保持着一个与代理的单一 「系统」TCP 连接。这个连接只用于从服务器端应用程序发出的消息,不用于接收消息。你可以为这个连接配置 STOMP 凭证(即 STOMP 框架的登录和密码 header)。这在 XML 命名空间和 Java 配置中都暴露为 systemLogin 和 systemPasscode 属性,默认值为 guest 和 guest。
STOMP 代理中继还为每个连接的 WebSocket 客户端创建一个单独的 TCP 连接。你可以配置 STOMP 证书,这些证书用于代表客户创建的所有 TCP 连接。这在 XML 命名空间和 Java 配置中被暴露为 clientLogin 和 clientPasscode 属性,默认值为 guest 和 guest。
:::info STOMP 代理中继总是在它代表客户转发给代理的每个 CONNECT 帧上设置登录和密码 header。因此,WebSocket 客户端不需要设置这些头信息。它们会被忽略。正如 认证 部分所解释的,WebSocket 客户端应依靠 HTTP 认证来保护 WebSocket 端点并建立客户端身份。 :::
STOMP 代理中继也通过 「系统 system」TCP 连接向消息代理发送和接收心跳。你可以配置发送和接收心跳的时间间隔(默认为 10 秒)。如果与经纪人的连接丢失,经纪人中继会继续尝试重新连接,每 5 秒一次,直到成功。
任何 Spring Bean 都可以实现 ApplicationListener<BrokerAvailabilityEvent>来接收与经纪人的 「系统」连接丢失和重新建立时的通知。例如,当没有活跃的 「系统 system」连接时,一个广播股票报价的股票报价服务可以停止尝试发送消息。
默认情况下,STOMP 代理中继器总是连接到同一主机和端口,并在失去连接时根据需要重新连接。如果你希望提供多个地址,在每次尝试连接时,你可以配置一个地址供应商,而不是一个固定的主机和端口。下面的例子显示了如何做到这一点:
@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {// ...@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());registry.setApplicationDestinationPrefixes("/app");}private ReactorNettyTcpClient<byte[]> createTcpClient() {return new ReactorNettyTcpClient<>(client -> client.addressSupplier(() -> ... ),new StompReactorNettyCodec());}}
你也可以用 virtualHost 属性来配置 STOMP 代理中继。这个属性的值被设置为每个 CONNECT 帧的主机头,可能很有用(例如,在云环境中,建立 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机不同)。
设置 RabbitMQ 为消息代理的例子
在前面的 例子中改造
开启 RabbitMQ Stomp 插件
RabbitMQ 中默认已经包含了 rabbitmq_stomp,只需要通过以下命令开启即可
rabbitmq-plugins enable rabbitmq_stomp
注意需要对外暴露 61613 端口,比如我这里使用的是 docker 测试的 RabbitMQ,就需要暴露 61613 端口,否则在程序启动的时候就会报如下的错
TCP connection failure in session _system_: Failed to connect: Connection refused: no further information: /127.0.0.1:61613
另外还需要注意的是,使用命令 enable rabbitmq_stomp 之后,还需要重启 RabbitMQ,否则就会出现下面这样的提示
链接的时候会提示 Broker not available
服务端如何配置链接到 RabbitMQ
其他的不需要修改,只需要如本文中说的一样,在 configureMessageBroker 中配置 broker 的链接信息即可
不过为了能使用 enableStompBrokerRelay 中默认使用的 netty tcp 链接还需要添加两个依赖
// 用于 TCP 连接管理: 链接到 RabbitMQ STOMPimplementation 'io.projectreactor.netty:reactor-netty:1.0.14'implementation 'io.netty:netty-all:4.1.72.Final'
package cn.mrcode.study.springdocsread.websocket;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;/*** @author mrcode*/@Configuration@EnableWebSocketMessageBrokerpublic class MyWebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// portfolio 是 WebSocket(或 SockJS)的端点的 HTTP URL。客户端需要连接以进行 WebSocket 握手。registry.addEndpoint("/portfolio").setAllowedOriginPatterns("*").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {// 目标标头以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法。config.setApplicationDestinationPrefixes("/app");// 使用内置的消息代理进行订阅和广播,并且 将目标标头以 /topic 或 /queue 开头的消息路由到代理。config.enableStompBrokerRelay("/topic", "/queue")// 默认使用 ReactorNettyTcpClient// .setTcpClient()// 所以可以直接设置相关的属性.setRelayHost("127.0.0.1").setRelayPort(61613);}/*** spring 为了支持每种容器自己的 websocket 升级策略,抽象了 RequestUpgradeStrategy,* <p>对 tomcat 提供了 TomcatRequestUpgradeStrategy 策略</p>* 如果不申明这个,就会在启动的时候抛出异常:No suitable default RequestUpgradeStrategy found*/@Beanpublic TomcatRequestUpgradeStrategy tomcatRequestUpgradeStrategy() {return new TomcatRequestUpgradeStrategy();}}
启动前端 html 测试
在控制台中功能正常的情况下会显示如下信息
在 RabbitMQ 中会多出一个临时队列
你可以开启其他的浏览器,然后打开测试的 HTML 例子,订阅成功后,就会多出来一个临时队列了
为什么要有一个外部的消息代理?
前面这个例子如果你将后端服务的端口修改下,启动两个服务。 然后把 html 也复制一份,每一份链接后端服务的一个。 分别打开他们,就你明白了。
在两个不同端口上的 STOMP 订阅者,能共享消息。 
如上图所示 8080 上的 html 打开了。订阅了 /topic/greeting ,端口 8081 上的 html 也打开了,同样订阅了相同的主题。那么当 8080 的先打开后,你再打开 8081 的 html,就会发现 8080 上还会收到 /topic/greeting 发来的消息。
可以看到上图,右侧的是后打开的页面,左侧和右侧收到的时间戳是一致的。也就是说他们在一个集群中。
这就是集群版的 STMP 哇
