STOMP 代理中转站保持着一个与代理的单一 「系统」TCP 连接。这个连接只用于从服务器端应用程序发出的消息,不用于接收消息。你可以为这个连接配置 STOMP 凭证(即 STOMP 框架的登录和密码 header)。这在 XML 命名空间和 Java 配置中都暴露为 systemLogin 和 systemPasscode 属性,默认值为 guest 和 guest。
STOMP 代理中继还为每个连接的 WebSocket 客户端创建一个单独的 TCP 连接。你可以配置 STOMP 证书,这些证书用于代表客户创建的所有 TCP 连接。这在 XML 命名空间和 Java 配置中被暴露为 clientLogin 和 clientPasscode 属性,默认值为 guest 和 guest。
:::info STOMP 代理中继总是在它代表客户转发给代理的每个 CONNECT 帧上设置登录和密码 header。因此,WebSocket 客户端不需要设置这些头信息。它们会被忽略。正如 认证 部分所解释的,WebSocket 客户端应依靠 HTTP 认证来保护 WebSocket 端点并建立客户端身份。 :::
STOMP 代理中继也通过 「系统 system」TCP 连接向消息代理发送和接收心跳。你可以配置发送和接收心跳的时间间隔(默认为 10 秒)。如果与经纪人的连接丢失,经纪人中继会继续尝试重新连接,每 5 秒一次,直到成功。
任何 Spring Bean 都可以实现 ApplicationListener<BrokerAvailabilityEvent>
来接收与经纪人的 「系统」连接丢失和重新建立时的通知。例如,当没有活跃的 「系统 system」连接时,一个广播股票报价的股票报价服务可以停止尝试发送消息。
默认情况下,STOMP 代理中继器总是连接到同一主机和端口,并在失去连接时根据需要重新连接。如果你希望提供多个地址,在每次尝试连接时,你可以配置一个地址供应商,而不是一个固定的主机和端口。下面的例子显示了如何做到这一点:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
你也可以用 virtualHost 属性来配置 STOMP 代理中继。这个属性的值被设置为每个 CONNECT 帧的主机头,可能很有用(例如,在云环境中,建立 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机不同)。
设置 RabbitMQ 为消息代理的例子
在前面的 例子中改造
开启 RabbitMQ Stomp 插件
RabbitMQ 中默认已经包含了 rabbitmq_stomp,只需要通过以下命令开启即可
rabbitmq-plugins enable rabbitmq_stomp
注意需要对外暴露 61613 端口,比如我这里使用的是 docker 测试的 RabbitMQ,就需要暴露 61613 端口,否则在程序启动的时候就会报如下的错
TCP connection failure in session _system_: Failed to connect: Connection refused: no further information: /127.0.0.1:61613
另外还需要注意的是,使用命令 enable rabbitmq_stomp 之后,还需要重启 RabbitMQ,否则就会出现下面这样的提示
链接的时候会提示 Broker not available
服务端如何配置链接到 RabbitMQ
其他的不需要修改,只需要如本文中说的一样,在 configureMessageBroker 中配置 broker 的链接信息即可
不过为了能使用 enableStompBrokerRelay 中默认使用的 netty tcp 链接还需要添加两个依赖
// 用于 TCP 连接管理: 链接到 RabbitMQ STOMP
implementation 'io.projectreactor.netty:reactor-netty:1.0.14'
implementation 'io.netty:netty-all:4.1.72.Final'
package cn.mrcode.study.springdocsread.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
/**
* @author mrcode
*/
@Configuration
@EnableWebSocketMessageBroker
public class MyWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// portfolio 是 WebSocket(或 SockJS)的端点的 HTTP URL。客户端需要连接以进行 WebSocket 握手。
registry.addEndpoint("/portfolio")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 目标标头以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法。
config.setApplicationDestinationPrefixes("/app");
// 使用内置的消息代理进行订阅和广播,并且 将目标标头以 /topic 或 /queue 开头的消息路由到代理。
config.enableStompBrokerRelay("/topic", "/queue")
// 默认使用 ReactorNettyTcpClient
// .setTcpClient()
// 所以可以直接设置相关的属性
.setRelayHost("127.0.0.1")
.setRelayPort(61613)
;
}
/**
* spring 为了支持每种容器自己的 websocket 升级策略,抽象了 RequestUpgradeStrategy,
* <p>对 tomcat 提供了 TomcatRequestUpgradeStrategy 策略</p>
* 如果不申明这个,就会在启动的时候抛出异常:No suitable default RequestUpgradeStrategy found
*/
@Bean
public TomcatRequestUpgradeStrategy tomcatRequestUpgradeStrategy() {
return new TomcatRequestUpgradeStrategy();
}
}
启动前端 html 测试
在控制台中功能正常的情况下会显示如下信息
在 RabbitMQ 中会多出一个临时队列
你可以开启其他的浏览器,然后打开测试的 HTML 例子,订阅成功后,就会多出来一个临时队列了
为什么要有一个外部的消息代理?
前面这个例子如果你将后端服务的端口修改下,启动两个服务。 然后把 html 也复制一份,每一份链接后端服务的一个。 分别打开他们,就你明白了。
在两个不同端口上的 STOMP 订阅者,能共享消息。
如上图所示 8080 上的 html 打开了。订阅了 /topic/greeting
,端口 8081 上的 html 也打开了,同样订阅了相同的主题。那么当 8080 的先打开后,你再打开 8081 的 html,就会发现 8080 上还会收到 /topic/greeting
发来的消息。
可以看到上图,右侧的是后打开的页面,左侧和右侧收到的时间戳是一致的。也就是说他们在一个集群中。
这就是集群版的 STMP 哇