这部分参考文档涵盖了对 Servlet 堆栈的支持、包含原始 WebSocket 交互的 WebSocket 消息传递、通过 SockJS 的 WebSocket 模拟以及通过 STOMP 作为 WebSocket 上的子协议的发布-订阅消息传递。

一、WebSocket 简介

WebSocket 协议RFC 6455提供了一种标准化方式,可通过单个 TCP 连接在客户端和服务器之间建立全双工双向通信通道。它是与 HTTP 不同的 TCP 协议,但旨在通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有的防火墙规则。
WebSocket 交互以 HTTP 请求开始,该请求使用 HTTPUpgrade标头进行升级,或者在这种情况下,切换到 WebSocket 协议。以下示例显示了这样的交互:

  1. GET /spring-websocket-portfolio/portfolio HTTP/1.1
  2. Host: localhost:8080
  3. Upgrade: websocket
  4. Connection: Upgrade
  5. Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
  6. Sec-WebSocket-Protocol: v10.stomp, v11.stomp
  7. Sec-WebSocket-Version: 13
  8. Origin: http://localhost:8080

①标Upgrade头。
②使用Upgrade连接。
与通常的 200 状态代码不同,具有 WebSocket 支持的服务器返回类似于以下的输出:

  1. HTTP/1.1 101 Switching Protocols
  2. Upgrade: websocket
  3. Connection: Upgrade
  4. Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
  5. Sec-WebSocket-Protocol: v10.stomp

①协议切换
握手成功后,HTTP 升级请求底层的 TCP 套接字保持打开状态,供客户端和服务器继续发送和接收消息。
WebSockets 如何工作的完整介绍超出了本文档的范围。请参阅 RFC 6455、HTML5 的 WebSocket 章节,或 Web 上的许多介绍和教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,您可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请查看云提供商有关 WebSocket 支持的说明。

1.1 HTTP 与 WebSocket

尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开始,但重要的是要了解这两种协议会导致非常不同的架构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。为了与应用程序交互,客户端以请求-响应方式访问这些 URL。服务器根据 HTTP URL、方法和标头将请求路由到适当的处理程序。
相比之下,在 WebSockets 中,初始连接通常只有一个 URL。随后,所有应用程序消息都在同一 TCP 连接上流动。这指向一个完全不同的异步、事件驱动的消息传递架构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它不为消息的内容规定任何语义。这意味着除非客户端和服务器在消息语义上达成一致,否则无法路由或处理消息。
Sec-WebSocket-ProtocolWebSocket 客户端和服务器可以通过HTTP 握手请求上的标头协商使用更高级别的消息传递协议(例如,STOMP) 。如果没有,他们需要提出自己的约定。

1.2 何时使用 WebSocket

WebSockets 可以使网页动态和交互。但是,在许多情况下,Ajax 和 HTTP 流或长轮询的组合可以提供简单有效的解决方案。
例如,新闻、邮件和社交订阅源需要动态更新,但每隔几分钟更新一次可能完全没问题。另一方面,协作、游戏和金融应用程序需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较少(例如,监控网络故障),HTTP 流式传输或轮询可以提供有效的解决方案。正是低延迟、高频率和高容量的组合,才成为使用 WebSocket 的最佳案例。
还要记住,在 Internet 上,不受您控制的限制性代理可能会阻止 WebSocket 交互,因为它们未配置为传递 Upgrade标头,或者因为它们关闭了看起来空闲的长期连接。这意味着将 WebSocket 用于防火墙内的内部应用程序是一个比面向公众的应用程序更直接的决定。

二、WebSocket API

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。

2.1 WebSocketHandler

创建 WebSocket 服务器要实现WebSocketHandler或者继承TextWebSocketHandler或BinaryWebSocketHandler。以下示例使用TextWebSocketHandler:

  1. import org.springframework.web.socket.WebSocketHandler;
  2. import org.springframework.web.socket.WebSocketSession;
  3. import org.springframework.web.socket.TextMessage;
  4. public class MyHandler extends TextWebSocketHandler {
  5. @Override
  6. public void handleTextMessage(WebSocketSession session, TextMessage message) {
  7. // ...
  8. }
  9. }

有专门的 WebSocket Java 配置和 XML 命名空间支持,用于将前面的 WebSocket 处理程序映射到特定的 URL,如以下示例所示:

  1. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  2. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  3. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  4. @Configuration
  5. @EnableWebSocket
  6. public class WebSocketConfig implements WebSocketConfigurer {
  7. @Override
  8. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  9. registry.addHandler(myHandler(), "/myHandler");
  10. }
  11. @Bean
  12. public WebSocketHandler myHandler() {
  13. return new MyHandler();
  14. }
  15. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:handlers>
  10. <websocket:mapping path="/myHandler" handler="myHandler"/>
  11. </websocket:handlers>
  12. <bean id="myHandler" class="org.springframework.samples.MyHandler"/>
  13. </beans>

前面的示例用于 Spring MVC 应用程序,应包含在DispatcherServlet。然而,Spring的WebSocket支持并不依赖于Spring MVC。在WebSocketHttpRequestHandler的帮助下,将WebSockeThatHandler集成到其他HTTP服务环境中相对简单。
当WebSocketHandler直接或间接使用 API 时,例如通过 STOMP消息传递,应用程序必须同步消息的发送,因为底层标准 WebSocket 会话 (JSR-356) 不允许并发发送。一种选择是使用ConcurrentWebSocketSessionDecorator包装WebSocketSession。

2.2. WebSocket 握手

定制初始HTTP WebSocket握手请求的最简单方法是通过握手Interceptor,它公开了握手前和握手后的方法。您可以使用这样的拦截器来阻止握手,或者使任何属性都可用于WebSocketSession。以下示例使用内置拦截器将HTTP会话属性传递给WebSocket会话:

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Override
  5. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  6. registry.addHandler(new MyHandler(), "/myHandler")
  7. .addInterceptors(new HttpSessionHandshakeInterceptor());
  8. }
  9. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:handlers>
  10. <websocket:mapping path="/myHandler" handler="myHandler"/>
  11. <websocket:handshake-interceptors>
  12. <bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
  13. </websocket:handshake-interceptors>
  14. </websocket:handlers>
  15. <bean id="myHandler" class="org.springframework.samples.MyHandler"/>
  16. </beans>

一个更高级的选项是扩展DefaultHandshakeHandler,它执行WebSocket握手的步骤,包括验证客户端来源、协商子协议和其他细节。如果应用程序需要配置自定义的RequestUpgrade策略,以适应尚未支持的WebSocket服务器引擎和版本,则可能还需要使用此选项(有关此主题的更多信息,请参阅部署)。Java配置和XML名称空间都使配置定制握手器成为可能。

Spring提供了一个WebSocketHandlerDecorator基类,您可以使用它来用其他行为装饰WebSocketHandler。在使用WebSocket Java配置或XML命名空间时,默认情况下会提供并添加日志记录和异常处理实现。ExceptionWebSocketHandlerDecorator捕获任何WebSocketHandler方法产生的所有未捕获异常,并关闭状态为1011的WebSocket会话,这表示服务器错误。

2.3 部署

Spring WebSocket API很容易集成到Spring MVC应用程序中,在该应用程序中DispatcherServlet同时提供HTTP WebSocket握手和其他HTTP请求。通过调用WebSocketHttpRequestHandler,还可以轻松地集成到其他HTTP处理场景中。这既方便又容易理解。然而,对于JSR-356运行时,需要特别考虑。
JavaWebSocket API(JSR-356)提供了两种部署机制。第一个涉及启动时的Servlet容器类路径扫描(Servlet 3特性)。另一个是用于Servlet容器初始化的注册API。这两种处理机制都不可能用于所有的前端 — 包括WebSocket握手和所有其他HTTP请求 — 比如Spring MVC的DispatcherServlet。
这是JSR-356的一个显著限制,即Spring的WebSocket支持特定于服务器的RequestUpgradeStrategy实现,即使在JSR-356运行时也是如此。这种策略目前适用于Tomcat、Jetty、GlassFish、WebLogic、WebSphere和Undertow(以及WildFly)。

已经创建了一个请求,以克服Java WebSocket API中的上述限制,可以在eclipse-ee4j/WebSocket API#211上找到。Tomcat、Undertow和WebSphere提供了它们自己的API替代方案,这使得实现这一点成为可能,Jetty也可以做到这一点。我们希望更多的服务器也能做到这一点。

第二个考虑因素是,支持JSR-356的Servlet容器预计会执行ServletContainerInitializer(SCI)扫描,这可能会减慢应用程序的启动速度 — 在某些情况下,这是戏剧性的。如果在升级到支持JSR-356的Servlet容器版本后观察到重大影响,则应该可以通过使用web中的元素有选择地启用或禁用web片段(和SCI扫描)。web.xml如下例所示:

  1. <web-app xmlns="http://java.sun.com/xml/ns/javaee"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="
  4. http://java.sun.com/xml/ns/javaee
  5. https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
  6. version="3.0">
  7. <absolute-ordering/>
  8. </web-app>

然后,您可以按名称有选择地启用web片段,例如Spring自己的SpringServletContainerInitializer,它为Servlet 3 Java初始化API提供支持。以下示例显示了如何执行此操作:

  1. <web-app xmlns="http://java.sun.com/xml/ns/javaee"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="
  4. http://java.sun.com/xml/ns/javaee
  5. https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
  6. version="3.0">
  7. <absolute-ordering>
  8. <name>spring_web</name>
  9. </absolute-ordering>
  10. </web-app>

2.4 服务器配置

每个底层WebSocket引擎都公开控制运行时特征的配置属性,例如消息缓冲区大小、空闲超时等。
对于Tomcat、WildFly和GlassFish,可以将ServletServerContainerFactoryBean添加到WebSocket Java配置中,如下例所示:

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Bean
  5. public ServletServerContainerFactoryBean createWebSocketContainer() {
  6. ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
  7. container.setMaxTextMessageBufferSize(8192);
  8. container.setMaxBinaryMessageBufferSize(8192);
  9. return container;
  10. }
  11. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <bean class="org.springframework...ServletServerContainerFactoryBean">
  10. <property name="maxTextMessageBufferSize" value="8192"/>
  11. <property name="maxBinaryMessageBufferSize" value="8192"/>
  12. </bean>
  13. </beans>

对于客户端WebSocket配置,应该使用WebSocketContainerFactoryBean(XML)或ContainerProvider.getWebSocketContainer()(Java配置)。

对于Jetty,您需要提供一个预配置的Jetty WebSocketServerFactory,并通过WebSocket Java配置将其插入Spring的DefaultHandshakeHandler。以下示例显示了如何执行此操作:

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Override
  5. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  6. registry.addHandler(echoWebSocketHandler(),
  7. "/echo").setHandshakeHandler(handshakeHandler());
  8. }
  9. @Bean
  10. public DefaultHandshakeHandler handshakeHandler() {
  11. WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
  12. policy.setInputBufferSize(8192);
  13. policy.setIdleTimeout(600000);
  14. return new DefaultHandshakeHandler(
  15. new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
  16. }
  17. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:handlers>
  10. <websocket:mapping path="/echo" handler="echoHandler"/>
  11. <websocket:handshake-handler ref="handshakeHandler"/>
  12. </websocket:handlers>
  13. <bean id="handshakeHandler" class="org.springframework...DefaultHandshakeHandler">
  14. <constructor-arg ref="upgradeStrategy"/>
  15. </bean>
  16. <bean id="upgradeStrategy" class="org.springframework...JettyRequestUpgradeStrategy">
  17. <constructor-arg ref="serverFactory"/>
  18. </bean>
  19. <bean id="serverFactory" class="org.eclipse.jetty...WebSocketServerFactory">
  20. <constructor-arg>
  21. <bean class="org.eclipse.jetty...WebSocketPolicy">
  22. <constructor-arg value="SERVER"/>
  23. <property name="inputBufferSize" value="8092"/>
  24. <property name="idleTimeout" value="600000"/>
  25. </bean>
  26. </constructor-arg>
  27. </bean>
  28. </beans>

2.5 允许的来源

从 Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是只接受同源请求。也可以允许所有或指定的来源列表。此检查主要是为浏览器客户端设计的。没有什么可以阻止其他类型的客户端修改Origin标头值( 有关详细信息,请参阅RFC 6454:Web 起源概念)。
三种可能的行为是:

  • 仅允许同源请求(默认):在此模式下,启用 SockJS 时,Iframe HTTP 响应标头X-Frame-Options设置为SAMEORIGIN,并且禁用 JSONP 传输,因为它不允许检查请求的来源。因此,启用此模式时不支持 IE6 和 IE7。
  • 允许指定的来源列表:每个允许的来源必须以http:// or开头https://。在这种模式下,启用 SockJS 时,会禁用 IFrame 传输。因此,启用此模式时,不支持 IE6 到 IE9。
  • 允许所有来源:要启用此模式,您应该提供*允许的来源值。在这种模式下,所有传输都可用。

您可以配置 WebSocket 和 SockJS 允许的来源,如以下示例所示:

  1. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  2. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  3. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  4. @Configuration
  5. @EnableWebSocket
  6. public class WebSocketConfig implements WebSocketConfigurer {
  7. @Override
  8. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  9. registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("https://mydomain.com");
  10. }
  11. @Bean
  12. public WebSocketHandler myHandler() {
  13. return new MyHandler();
  14. }
  15. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:handlers allowed-origins="https://mydomain.com">
  10. <websocket:mapping path="/myHandler" handler="myHandler" />
  11. </websocket:handlers>
  12. <bean id="myHandler" class="org.springframework.samples.MyHandler"/>
  13. </beans>

三、SockJS Fallback

在公共互联网上,您无法控制的限制性代理可能会阻止WebSocket交互,这可能是因为它们未配置为传递升级头,也可能是因为它们关闭了看似空闲的长期连接。
解决这个问题的方法是WebSocket仿真 — 也就是说,尝试先使用WebSocket,然后再使用基于HTTP的技术来模拟WebSocket交互,并公开相同的应用程序级API。
在Servlet堆栈上,Spring框架为SockJS协议提供了服务器(以及客户端)支持。

3.1 概述

SockJS 的目标是让应用程序使用 WebSocket API,但在运行时需要时回退到非 WebSocket 替代方案,而无需更改应用程序代码。
SockJS 包括:

  • 以可执行 叙述测试的形式定义的SockJS 协议
  • SockJS JavaScript 客户端 ——一个用于浏览器的客户端库。
  • SockJS 服务器实现,包括 Spring Frameworkspring-websocket模块中的一个。
  • 模块中的 SockJS Java 客户端spring-websocket(从 4.1 版开始)。

SockJS 是为在浏览器中使用而设计的。它使用多种技术来支持各种浏览器版本。有关 SockJS 传输类型和浏览器的完整列表,请参阅 SockJS 客户端页面。传输分为三大类:WebSocket、HTTP 流和 HTTP 长轮询。有关这些类别的概述,请参阅 此博客文章
SockJS 客户端首先发送GET /info以从服务器获取基本信息。之后,它必须决定使用哪种传输方式。如果可能,使用 WebSocket。如果没有,在大多数浏览器中,至少有一个 HTTP 流选项。如果不是,则使用 HTTP(长)轮询。
所有传输请求都具有以下 URL 结构:
https://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
涉及场景:

  • {server-id}对于在集群中路由请求很有用,但在其他情况下不使用。
  • {session-id}关联属于 SockJS 会话的 HTTP 请求。
  • {transport}指示传输类型(例如 、websocket等xhr-streaming)。

WebSocket 传输只需要一个 HTTP 请求来进行 WebSocket 握手。此后的所有消息都在该套接字上交换。
HTTP 传输需要更多请求。例如,Ajax/XHR 流式传输依赖于对服务器到客户端消息的一个长时间运行的请求以及对客户端到服务器消息的附加 HTTP POST 请求。长轮询与此类似,只是它在每次服务器到客户端发送后结束当前请求。
SockJS增加了最小的消息框架。例如,服务器最初发送字母o(“打开”帧),消息作为[“message1”,”message2”](JSON编码的数组)发送,如果25秒内没有消息流动,则发送字母h(“心跳”帧)(默认情况下),并发送字母c(“关闭”帧)以关闭会话。
要了解更多信息,请在浏览器中运行一个示例,并查看HTTP请求。SockJS客户端允许修复传输列表,因此可以一次查看一个传输。SockJS客户端还提供了一个调试标志,可以在浏览器控制台中显示有用的消息。在服务器端,您可以为org启用跟踪日志记录。springframework。网状物插座有关更多详细信息,请参阅SockJS协议测试。

3.2 启用 SockJS

您可以通过 Java 配置启用 SockJS,如以下示例所示:

  1. @Configuration
  2. @EnableWebSocket
  3. public class WebSocketConfig implements WebSocketConfigurer {
  4. @Override
  5. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  6. registry.addHandler(myHandler(), "/myHandler").withSockJS();
  7. }
  8. @Bean
  9. public WebSocketHandler myHandler() {
  10. return new MyHandler();
  11. }
  12. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:handlers>
  10. <websocket:mapping path="/myHandler" handler="myHandler"/>
  11. <websocket:sockjs/>
  12. </websocket:handlers>
  13. <bean id="myHandler" class="org.springframework.samples.MyHandler"/>
  14. </beans>

前面的示例用于Spring MVC应用程序,应该包含在DispatcherServlet的配置中。然而,Spring的WebSocket和SockJS支持并不依赖于Spring MVC。在SockJsHttpRequestHandler的帮助下,集成到其他HTTP服务环境相对简单。
在浏览器端,应用程序可以使用sockjs客户端(1.0.x版)。它模拟W3C WebSocket API,并与服务器通信,以根据运行它的浏览器选择最佳传输选项。请参阅sockjs客户端页面和浏览器支持的传输类型列表。客户端还提供了几个配置选项 — 例如,指定要包括哪些传输。

3.3 IE 8 和 9

Internet Explorer 8 和 9 仍在使用中。它们是拥有 SockJS 的关键原因。本节介绍有关在这些浏览器中运行的重要注意事项。
SockJS 客户端通过使用 Microsoft 的 XDomainRequest. 这适用于跨域,但不支持发送 cookie。Cookies 对于 Java 应用程序通常是必不可少的。然而,由于 SockJS 客户端可以与许多服务器类型(不仅仅是 Java 类型)一起使用,它需要知道 cookie 是否重要。如果是这样,SockJS 客户端更喜欢 Ajax/XHR 进行流式传输。否则,它依赖于基于 iframe 的技术。
SockJS 客户端的第/info一个请求是对可能影响客户端选择传输的信息的请求。这些细节之一是服务器应用程序是否依赖 cookie(例如,出于身份验证目的或使用粘性会话进行集群)。Spring 的 SockJS 支持包括一个名为sessionCookieNeeded. 默认情况下启用它,因为大多数 Java 应用程序都依赖于JSESSIONID cookie。如果你的应用不需要它,你可以关闭这个选项,然后 SockJS 客户端应该xdr-streaming在 IE 8 和 9 中选择。
如果确实使用基于iframe的传输,请记住,可以通过将HTTP响应头X-Frame-Options设置为DENY、SAMEORIGIN或ALLOW-FROM来指示浏览器阻止在给定页面上使用iframe。这是用来防止点击劫持的。

Spring Security 3.2+ 支持X-Frame-Options对每个响应进行设置。默认情况下,Spring Security Java 配置将其设置为DENY. 在 3.2 中,Spring Security XML 命名空间默认不设置该标头,但可以配置为这样做。以后可能会默认设置。 有关如何配置标头设置的详细信息,请参阅Spring Security 文档的默认安全X-Frame-Options标头。您还可以查看 gh-2718 了解更多背景信息。

如果您的应用程序添加了X-Frame-Options响应标头(应该如此!)并依赖于基于 iframe 的传输,您需要将标头值设置为 SAMEORIGINor ALLOW-FROM 。Spring SockJS 支持还需要知道 SockJS 客户端的位置,因为它是从 iframe 加载的。默认情况下,iframe 设置为从 CDN 位置下载 SockJS 客户端。将此选项配置为使用与应用程序同源的 URL 是个好主意。
以下示例显示了如何在 Java 配置中执行此操作:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void registerStompEndpoints(StompEndpointRegistry registry) {
  6. registry.addEndpoint("/portfolio").withSockJS()
  7. .setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");
  8. }
  9. // ...
  10. }

XML 命名空间通过元素提供了类似的选项。

在初始开发期间,请启用 SockJS 客户端devel模式,以防止浏览器缓存 SockJS 请求(如 iframe),否则这些请求会被缓存。有关如何启用它的详细信息,请参阅 SockJS 客户端页面。

3.4 心跳

SockJS 协议要求服务器发送心跳消息以阻止代理断定连接已挂起。Spring SockJS 配置有一个名为的属性heartbeatTime,您可以使用它来自定义频率。默认情况下,心跳会在 25 秒后发送,假设在该连接上没有发送其他消息。此 25 秒值符合以下 IETF对公共 Internet 应用程序的建议。

在 WebSocket 和 SockJS 上使用 STOMP 时,如果 STOMP 客户端和服务器协商要交换的心跳,则禁用 SockJS 心跳。

Spring SockJS 支持还允许您配置TaskScheduler以安排心跳任务。任务调度程序由线程池支持,默认设置基于可用处理器的数量。您应该考虑根据您的特定需求自定义设置。

3.5 客户端断开连接

HTTP 流式传输和 HTTP 长轮询 SockJS 传输要求连接保持打开的时间比平时更长。有关这些技术的概述,请参阅 此博客文章
在 Servlet 容器中,这是通过 Servlet 3 异步支持完成的,该支持允许退出 Servlet 容器线程、处理请求并继续写入来自另一个线程的响应。
一个特定的问题是 Servlet API 不为已离开的客户端提供通知。请参阅eclipse-ee4j/servlet-api#44。但是,Servlet 容器会在后续尝试写入响应时引发异常。由于 Spring 的 SockJS 服务支持服务器发送的心跳(默认每 25 秒),这意味着通常会在该时间段内检测到客户端断开连接(或者更早,如果消息发送更频繁)。

因此,由于客户端已断开连接,可能会发生网络 I/O 故障,这可能会用不必要的堆栈跟踪填充日志。Spring 尽最大努力识别代表客户端断开连接(特定于每个服务器)的此类网络故障,并使用专用日志类别DISCONNECTED_CLIENT_LOG_CATEGORY (定义在 中AbstractSockJsSession)记录最少的消息。如果您需要查看堆栈跟踪,可以将该日志类别设置为 TRACE。

3.6 SockJS 和 CORS

如果您允许跨域请求(请参阅Allowed Origins),SockJS 协议使用 CORS 在 XHR 流式传输和轮询传输中提供跨域支持。因此,会自动添加 CORS 标头,除非检测到响应中存在 CORS 标头。因此,如果应用程序已经配置为提供 CORS 支持(例如,通过 Servlet 过滤器),Spring 会SockJsService跳过这一部分。
也可以通过 suppressCors在 Spring 的 SockJsService 中设置属性来禁用这些 CORS 标头的添加。
SockJS 需要以下标头和值:

  • Access-Control-Allow-Origin:从Origin请求头的值初始化。
  • Access-Control-Allow-Credentials:始终设置为true。
  • Access-Control-Request-Headers:从等效请求标头中的值初始化。
  • Access-Control-Allow-Methods:传输支持的 HTTP 方法(参见TransportType枚举)。
  • Access-Control-Max-Age:设置为 31536000(1 年)。

具体实现见源代码中的addCorsHeadersinAbstractSockJsService和TransportTypeenum 。
或者,如果 CORS 配置允许,请考虑排除带有 SockJS 端点前缀的 URL,从而让 SpringSockJsService处理它。

3.7 SockJsClient

Spring 提供了一个 SockJS Java 客户端来连接远程 SockJS 端点,而无需使用浏览器。当需要在公共网络上的两个服务器之间进行双向通信时(即网络代理可能会阻止使用 WebSocket 协议),这可能特别有用。SockJS Java 客户端对于测试目的也非常有用(例如,模拟大量并发用户)。
SockJS Java 客户端支持websocket、xhr-streaming和xhr-polling 传输。其余的仅对在浏览器中使用有意义。
您可以配置WebSocketTransport:

  • StandardWebSocketClient在 JSR-356 运行时中。
  • JettyWebSocketClient通过使用 Jetty 9+ 本机 WebSocket API。
  • Spring 的任何实现WebSocketClient。

根据XhrTransport定义,一个 支持xhr-streaming和xhr-polling,因为从客户端的角度来看,除了用于连接服务器的 URL 之外没有其他区别。目前有两种实现方式:

  • RestTemplateXhrTransport使用 Spring 的RestTemplateHTTP 请求。
  • JettyXhrTransport使用 Jetty 的HttpClientHTTP 请求。

以下示例显示了如何创建 SockJS 客户端并连接到 SockJS 端点:

  1. List<Transport> transports = new ArrayList<>(2);
  2. transports.add(new WebSocketTransport(new StandardWebSocketClient()));
  3. transports.add(new RestTemplateXhrTransport());
  4. SockJsClient sockJsClient = new SockJsClient(transports);
  5. sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");

SockJS 使用 JSON 格式的数组来存储消息。默认情况下,使用 Jackson 2 并且需要在类路径上。或者,您可以配置自定义实现 SockJsMessageCodec并在SockJsClient.

要用于SockJsClient模拟大量并发用户,您需要配置底层 HTTP 客户端(用于 XHR 传输)以允许足够数量的连接和线程。以下示例显示了如何使用 Jetty 执行此操作:

  1. HttpClient jettyHttpClient = new HttpClient();
  2. jettyHttpClient.setMaxConnectionsPerDestination(1000);
  3. jettyHttpClient.setExecutor(new QueuedThreadPool(1000));

下面的示例显示了服务器端SoCKJS相关属性(参见JavaDoc的详细信息),您还应该考虑定制:

  1. @Configuration
  2. public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {
  3. @Override
  4. public void registerStompEndpoints(StompEndpointRegistry registry) {
  5. registry.addEndpoint("/sockjs").withSockJS()
  6. .setStreamBytesLimit(512 * 1024)
  7. .setHttpMessageCacheSize(1000)
  8. .setDisconnectDelay(30 * 1000);
  9. }
  10. // ...
  11. }

①将该streamBytesLimit属性设置为 512KB(默认为 128KB — 128 1024)。
②将该httpMessageCacheSize属性设置为 1,000(默认值为100)。
③将disconnectDelay属性设置为 30 属性秒(默认为 5 秒 — 5
1000)。

四、STOMP

WebSocket 协议定义了两种类型的消息(文本和二进制),但它们的内容是未定义的。该协议定义了客户端和服务器协商子协议(即更高级别的消息传递协议)的机制,用于在 WebSocket 之上定义每个可以发送什么样的消息,格式是什么,内容每条消息,依此类推。子协议的使用是可选的,但无论哪种方式,客户端和服务器都需要就定义消息内容的某些协议达成一致。

4.1 概述

STOMP(面向简单文本的消息传递协议)最初是为脚本语言(如 Ruby、Python 和 Perl)创建的,用于连接到企业消息代理。它旨在解决常用消息传递模式的最小子集。STOMP 可用于任何可靠的双向流网络协议,例如 TCP 和 WebSocket。尽管 STOMP 是一个面向文本的协议,但消息负载可以是文本或二进制。
STOMP 是一种基于帧的协议,其帧以 HTTP 为模型。以下清单显示了 STOMP 框架的结构:

COMMAND header1:value1 header2:value2 Body^@

客户端可以使用SEND或者SUBSCRIBE命令来发送或订阅消息,以及destination描述消息内容和谁应该接收消息的标题。这启用了一个简单的发布-订阅机制,您可以使用该机制通过代理向其他连接的客户端发送消息,或向服务器发送消息以请求执行某些工作。
当您使用 Spring 的 STOMP 支持时,Spring WebSocket 应用程序充当客户端的 STOMP 代理。消息被路由到@Controller消息处理方法或一个简单的内存中代理,该代理跟踪订阅并将消息广播给订阅用户。您还可以将 Spring 配置为使用专用的 STOMP 代理(例如 RabbitMQ、ActiveMQ 等)来进行实际的消息广播。在这种情况下,Spring 维护与代理的 TCP 连接,将消息中继给它,并将消息从它向下传递到连接的 WebSocket 客户端。因此,Spring Web 应用程序可以依赖统一的基于 HTTP 的安全性、通用验证和熟悉的编程模型来处理消息。
以下示例显示了订阅接收股票报价的客户端,服务器可能会定期发送股票报价(例如,通过一个调度任务,通过SimpMessageTemplate向代理发送消息):

SUBSCRIBE id:sub-1 destination:/topic/price.stock.* ^@

以下示例显示了发送交易请求的客户端,服务器可以通过@MessageMapping方法处理该请求:

SEND destination:/queue/trade content-type:application/json content-length:44 {"action":"BUY","ticker":"MMM","shares",44}^@

执行后,服务器可以向客户端广播交易确认消息和详细信息。
在 STOMP 规范中,目的地的含义故意不透明。它可以是任何字符串,完全由 STOMP 服务器来定义它们支持的目的地的语义和语法。然而,目的地是类似路径的字符串是很常见的,其中/topic/..意味着发布-订阅(一对多)和/queue/点对点(一对一)消息交换。
STOMP 服务器可以使用该MESSAGE命令向所有订阅者广播消息。以下示例显示了服务器向订阅的客户端发送股票报价:

MESSAGE message-id:nxahklf6-1 subscription:sub-1 destination:/topic/price.stock.MMM {"ticker":"MMM","price":129.45}^@

服务器不能发送未经请求的消息。来自服务器的所有消息都必须响应特定的客户端订阅,并且 subscription-id服务器消息的标头必须与id客户端订阅的标头匹配。
前面的概述旨在提供对 STOMP 协议的最基本理解。我们建议全面审查协议规范。

4.2 好处

使用 STOMP 作为子协议可以让 Spring Framework 和 Spring Security 提供比使用原始 WebSockets 更丰富的编程模型。关于 HTTP 与原始 TCP 以及它如何让 Spring MVC 和其他 Web 框架提供丰富的功能,可以得出同样的观点。以下是福利清单:

  • 无需发明自定义消息传递协议和消息格式。
  • STOMP 客户端,包括 Spring 框架中的Java 客户端,都可用。
  • 您可以(可选)使用消息代理(例如 RabbitMQ、ActiveMQ 等)来管理订阅和广播消息。
  • 应用程序逻辑可以组织在任意数量的@Controller实例中,并且可以根据 STOMP 目标标头将消息路由到它们,而不是使用单个WebSocketHandler给定连接处理原始 WebSocket 消息。
  • 您可以使用 Spring Security 来保护基于 STOMP 目标和消息类型的消息。

    4.3 启用 STOMP

    spring消息和spring WebSocket模块中提供了对STOMP over WebSocket的支持。一旦有了这些依赖项,就可以使用SockJS Fallback在WebSocket上公开STOMP端点,如下例所示: ```java import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  1. @Override
  2. public void registerStompEndpoints(StompEndpointRegistry registry) {
  3. registry.addEndpoint("/portfolio").withSockJS();
  4. }
  5. @Override
  6. public void configureMessageBroker(MessageBrokerRegistry config) {
  7. config.setApplicationDestinationPrefixes("/app");
  8. config.enableSimpleBroker("/topic", "/queue");
  9. }

}

  1. ①/portfolio WebSocket(或 SockJS)客户端需要连接到 WebSocket 握手的端点的 HTTP URL。<br />②目标标头以开头的 STOMP 消息/app被路由到 类中的@MessageMapping方法@Controller。<br />③使用内置消息代理进行订阅和广播,并将目标标头以该代理开头的消息路由/topic或/queue到代理。<br />以下示例显示了与前面示例等效的 XML 配置:
  2. ```xml
  3. <beans xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:websocket="http://www.springframework.org/schema/websocket"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/beans
  8. https://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/websocket
  10. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  11. <websocket:message-broker application-destination-prefix="/app">
  12. <websocket:stomp-endpoint path="/portfolio">
  13. <websocket:sockjs/>
  14. </websocket:stomp-endpoint>
  15. <websocket:simple-broker prefix="/topic, /queue"/>
  16. </websocket:message-broker>
  17. </beans>

对于内置的简单代理,/topic和/queue前缀没有任何特殊含义。它们只是区分 pub-sub 与点对点消息传递(即多个订阅者与一个消费者)的约定。当您使用外部代理时,请查看代理的 STOMP 页面以了解其支持的 STOMP 目的地和前缀类型。

要从浏览器连接,对于 SockJS,您可以使用 sockjs-client. 对于 STOMP,许多应用程序都使用了jmesnil/stomp-websocket库(也称为 stomp.js),该库功能齐全,已在生产中使用多年,但不再维护。目前, JSteunou/webstomp-client是该库最积极维护和发展的继任者。以下示例代码基于它:

  1. var socket = new SockJS("/spring-websocket-portfolio/portfolio");
  2. var stompClient = webstomp.over(socket);
  3. stompClient.connect({}, function(frame) {
  4. }

或者,如果通过 WebSocket(没有 SockJS)连接,您可以使用以下代码:

  1. var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
  2. var stompClient = Stomp.over(socket);
  3. stompClient.connect({}, function(frame) {
  4. }

请注意,stompClient在前面的示例中不需要指定login 和passcode标头。即使这样做了,它们也会在服务器端被忽略(或者更确切地说,被覆盖)。有关身份验证的更多信息,请参阅连接到代理 和身份验证。
有关更多示例代码,请参见:

  • 使用 WebSocket 构建交互式 Web 应用程序 - 入门指南。
  • 股票投资组合 — 一个示例应用程序。

    4.4 WebSocket 服务器

    要配置底层WebSocket服务器,服务器配置中的信息适用。但是,对于Jetty,您需要通过StompEndpointRegistry设置握手器和WebSocketPolicy:

    1. @Configuration
    2. @EnableWebSocketMessageBroker
    3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    4. @Override
    5. public void registerStompEndpoints(StompEndpointRegistry registry) {
    6. registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
    7. }
    8. @Bean
    9. public DefaultHandshakeHandler handshakeHandler() {
    10. WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
    11. policy.setInputBufferSize(8192);
    12. policy.setIdleTimeout(600000);
    13. return new DefaultHandshakeHandler(
    14. new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    15. }
    16. }

    4.5 消息流

    一旦暴露了 STOMP 端点,Spring 应用程序就会成为连接客户端的 STOMP 代理。本节介绍服务器端的消息流。
    spring消息传递模块包含对消息传递应用程序的基础支持,这些应用程序起源于spring集成,后来被提取并整合到spring框架中,以便在许多spring项目和应用程序场景中更广泛地使用。以下列表简要介绍了一些可用的消息摘要:

  • Message:消息的简单表示,包括标头和有效负载。

  • MessageHandler:处理消息的合同。
  • MessageChannel:发送消息的合约,使生产者和消费者之间松散耦合。
  • SubscribableChannel: MessageChannel与MessageHandler订阅者。
  • ExecutorSubscribableChannel : SubscribableChannel使用 anExecutor来传递消息。

Java 配置(即@EnableWebSocketMessageBroker)和 XML 名称空间配置(即)都使用上述组件来组装消息工作流。下图显示了启用简单内置消息代理时使用的组件:
WebSocket API - 图1
上图显示了三个消息通道:

  • clientInboundChannel:用于传递从 WebSocket 客户端接收到的消息。
  • clientOutboundChannel:用于向 WebSocket 客户端发送服务器消息。
  • brokerChannel:用于从服务器端应用程序代码中向消息代理发送消息。

下图显示了配置外部代理(例如 RabbitMQ)以管理订阅和广播消息时使用的组件:
WebSocket API - 图2
前面两个图之间的主要区别是使用“代理中继”通过TCP向上传递消息到外部STOMP代理,并将消息从代理向下传递到订阅的客户端。
当从WebSocket连接接收到消息时,它们会被解码为跺脚帧,转换为Spring消息表示,并发送到clientInboundChannel进行进一步处理。例如,目标头以/app开头的STOMP消息可以路由到带注释的控制器中的@MessageMapping方法,而/topic和/queue消息可以直接路由到message broker。
处理来自客户端的STOMP消息的带注释的@Controller可以通过brokerChannel向message broker发送消息,并且broker通过clientOutboundChannel将消息广播给匹配的订阅者。同一个控制器也可以对HTTP请求做出同样的响应,因此客户端可以执行HTTP POST,然后@PostMapping方法可以向message broker发送消息,以向订阅的客户端广播。
我们可以通过一个简单的例子来追踪流程。考虑下面的例子,它设置了一个服务器:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void registerStompEndpoints(StompEndpointRegistry registry) {
  6. registry.addEndpoint("/portfolio");
  7. }
  8. @Override
  9. public void configureMessageBroker(MessageBrokerRegistry registry) {
  10. registry.setApplicationDestinationPrefixes("/app");
  11. registry.enableSimpleBroker("/topic");
  12. }
  13. }
  14. @Controller
  15. public class GreetingController {
  16. @MessageMapping("/greeting")
  17. public String handle(String greeting) {
  18. return "[" + getTimestamp() + ": " + greeting;
  19. }
  20. }

前面的示例支持以下流程:

  1. 客户端连接到http://localhost:8080/portfolio,一旦建立 WebSocket 连接,STOMP 帧开始在其上流动。
  2. 客户端发送一个 SUBSCRIBE 帧,其目标标头为/topic/greeting. 一旦接收到并解码,消息就会被发送到,clientInboundChannel然后被路由到存储客户端订阅的消息代理。
  3. 客户端发送一个 SEND 帧到/app/greeting. 前缀有助于将其/app路由到带注释的控制器。/app去除前缀后,/greeting 目标的剩余部分映射@MessageMapping到GreetingController.
  4. 从返回的值GreetingController转换为Message具有基于返回值和默认目标标头的有效负载 的 Spring /topic/greeting(从输入目标派生,/app替换为 /topic)。生成的消息被发送到brokerChannel消息代理并由消息代理处理。
  5. 消息代理找到所有匹配的订阅者并通过 向每个订阅者发送一个 MESSAGE 帧clientOutboundChannel,从那里消息被编码为 STOMP 帧并在 WebSocket 连接上发送。

下一节将提供有关带注释的方法的更多详细信息,包括支持的参数类型和返回值。

4.6 带注释的控制器

应用程序可以使用带注释@Controller的类来处理来自客户端的消息。这些类可以声明@MessageMapping、@SubscribeMapping和@ExceptionHandler 方法,如以下主题中所述:

  • @MessageMapping
  • @SubscribeMapping
  • @MessageExceptionHandler
    @MessageMapping
    您可以使用@MessageMapping注释根据目的地路由消息的方法。它在方法级别和类型级别都受支持。在类型级别,@MessageMapping用于表示控制器中所有方法的共享映射。
    默认情况下,映射值是 Ant 样式的路径模式(例如/thing,/thing/),包括对模板变量的支持(例如,/thing/{id})。这些值可以通过@DestinationVariable方法参数引用。应用程序还可以切换到点分隔的映射目标约定,如 Dots as Separators中所述。
    支持的方法参数*

    下表描述了方法参数:
方法参数 描述
Message 用于访问完整的消息。
MessageHeaders 用于访问Message.
MessageHeaderAccessor, SimpMessageHeaderAccessor, 和StompHeaderAccessor 通过类型化的访问器方法访问标题。
@Payload 用于访问消息的有效负载,由配置的 MessageConverter.
不需要此注释的存在,因为默认情况下,如果没有其他参数匹配,则假定它。
@javax.validation.Valid您可以使用或 Spring注释有效负载参数@Validated,以自动验证有效负载参数。
@Header 用于访问特定标头值 - 以及使用 的类型转换 org.springframework.core.convert.converter.Converter(如有必要)。
@Headers 用于访问消息中的所有标头。此参数必须可分配给 java.util.Map。
@DestinationVariable 用于访问从消息目标中提取的模板变量。必要时将值转换为声明的方法参数类型。
java.security.Principal 反映在 WebSocket HTTP 握手时登录的用户。

返回值
默认情况下,@MessageMapping方法的返回值通过匹配的MessageConverter序列化为有效负载,并作为消息发送到brokerChannel,从那里向订阅者广播。出站消息的目的地与入站消息的目的地相同,但前缀为/topic。
您可以使用@SendTo和@SendToUser注释自定义输出消息的目的地@SendTo用于自定义目标目的地或指定多个目的地@SendToUser用于将输出消息仅定向到与输入消息关联的用户。查看用户目的地。
您可以在同一方法上同时使用@SendTo和@SendToUser,这两种方法在类级别都受支持,在这种情况下,它们充当类中方法的默认值。但是,请记住,任何方法级别的@SendTo或@SendToUser注释都会覆盖类级别的任何此类注释。
消息可以异步处理,@MessageMapping方法可以返回ListenableFuture、CompletableFuture或CompletionStage。
请注意,@SendTo和@SendToUser只是一种方便,相当于使用SimpMessageTemplate发送消息。如果需要,对于更高级的场景,@MessageMapping方法可以直接使用SimpMessageTemplate。这可以代替返回一个值,也可以作为返回值的补充。请参阅发送消息。

@SubscribeMapping

@SubscribeMapping与@MessageMapping类似,但仅将映射范围缩小到订阅消息。它支持与@MessageMapping相同的方法参数。但是,对于返回值,默认情况下,消息直接发送到客户端(通过clientOutboundChannel响应订阅),而不是发送到代理(通过brokerChannel,作为对匹配订阅的广播)。添加@SendTo或@SendToUser将覆盖此行为,并改为发送到代理。
这什么时候有用?假设代理映射到/topic和/queue,而应用程序控制器映射到/app。在这个设置中,代理存储所有对/topic和/queue的订阅,这些订阅都是为了重复广播,应用程序不需要参与。客户机还可以订阅某个/app目的地,控制器可以返回一个值来响应该订阅,而无需涉及代理,无需再次存储或使用该订阅(实际上是一次性请求-应答交换)。其中一个用例是在启动时用初始数据填充UI。
这什么时候没用?不要尝试将代理和控制器映射到同一个目标前缀,除非出于某种原因希望两者都独立处理消息,包括订阅。入站消息是并行处理的。无法保证代理或控制器是否首先处理给定的消息。如果目标是在订阅被存储并准备好进行广播时收到通知,那么如果服务器支持,客户端应该请求接收(simple broker不支持)。例如,使用Java STOMP客户端,可以执行以下操作来添加收据:

  1. @Autowired
  2. private TaskScheduler messageBrokerTaskScheduler;
  3. // During initialization..
  4. stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
  5. // When subscribing..
  6. StompHeaders headers = new StompHeaders();
  7. headers.setDestination("/topic/...");
  8. headers.setReceipt("r1");
  9. FrameHandler handler = ...;
  10. stompSession.subscribe(headers, handler).addReceiptTask(() -> {
  11. // Subscription ready...
  12. });

服务器端选项是在brokerChannel上注册ExecutorChannelInterceptor,并实现afterMessageHandled方法,该方法在处理消息(包括订阅)后调用。

@MessageExceptionHandler

应用程序可以使用@MessageExceptionHandler方法来处理来自@MessageMapping方法的异常。如果希望访问异常实例,可以在注释本身中或通过方法参数声明异常。以下示例通过方法参数声明异常:

  1. @Controller
  2. public class MyController {
  3. // ...
  4. @MessageExceptionHandler
  5. public ApplicationError handleException(MyException exception) {
  6. // ...
  7. return appError;
  8. }
  9. }

@MessageExceptionHandler方法支持灵活的方法签名,并支持与@MessageMapping方法相同的方法参数类型和返回值。
通常,@MessageExceptionHandler方法在声明它们的@Controller类(或类层次结构)中应用。如果希望这些方法更全局地(跨控制器)应用,可以在标记为@ControllerAdvice的类中声明它们。这与Spring MVC中提供的类似支持相当。

4.7 发送消息

如果要从应用程序的任何部分向连接的客户端发送消息,该怎么办?任何应用程序组件都可以向brokerChannel发送消息。最简单的方法是注入SimpMessageTemplate并使用它发送消息。通常,您会按类型注入它,如下例所示:

  1. @Controller
  2. public class GreetingController {
  3. private SimpMessagingTemplate template;
  4. @Autowired
  5. public GreetingController(SimpMessagingTemplate template) {
  6. this.template = template;
  7. }
  8. @RequestMapping(path="/greetings", method=POST)
  9. public void greet(String greeting) {
  10. String text = "[" + getTimestamp() + "]:" + greeting;
  11. this.template.convertAndSend("/topic/greetings", text);
  12. }
  13. }

但是,如果存在相同类型的另一个bean,也可以通过其名称(BrokerMessageTemplate)对其进行限定。

4.8 简单代理

内置的简单消息代理处理来自客户端的订阅请求,将它们存储在内存中,并将消息广播到具有匹配目的地的连接客户端。代理支持类似路径的目的地,包括对Ant风格的目的地模式的订阅。

应用程序还可以使用点分隔(而不是斜杠分隔)的目的地。将点视为分隔符。

如果配置了任务调度器,则简单代理支持STOMP心跳。要配置调度器,可以声明自己的TaskScheduler bean,并通过MessageBrokerRegistry进行设置。或者,您可以使用内置WebSocket配置中自动声明的配置,但是,您需要@Lazy来避免内置WebSocket配置和WebSocketMessageBrokerConfiguration之间的循环。例如:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. private TaskScheduler messageBrokerTaskScheduler;
  5. @Autowired
  6. public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
  7. this.messageBrokerTaskScheduler = taskScheduler;
  8. }
  9. @Override
  10. public void configureMessageBroker(MessageBrokerRegistry registry) {
  11. registry.enableSimpleBroker("/queue/", "/topic/")
  12. .setHeartbeatValue(new long[] {10000, 20000})
  13. .setTaskScheduler(this.messageBrokerTaskScheduler);
  14. // ...
  15. }
  16. }

4.9 外部代理

简单代理非常适合入门,但只支持STOMP命令的子集(不支持ack、receipts和其他一些功能),依赖于简单的消息发送循环,不适合集群。作为替代方案,您可以升级应用程序以使用功能齐全的message broker。
请参阅所选message broker(如RabbitMQ、ActiveMQ等)的STOMP文档,安装该代理,并在启用STOMP支持的情况下运行它。然后可以在Spring配置中启用STOMP代理中继(而不是简单的代理)。
以下示例配置启用了功能齐全的代理:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void registerStompEndpoints(StompEndpointRegistry registry) {
  6. registry.addEndpoint("/portfolio").withSockJS();
  7. }
  8. @Override
  9. public void configureMessageBroker(MessageBrokerRegistry registry) {
  10. registry.enableStompBrokerRelay("/topic", "/queue");
  11. registry.setApplicationDestinationPrefixes("/app");
  12. }
  13. }

以下示例显示了与前一示例等效的XML配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:message-broker application-destination-prefix="/app">
  10. <websocket:stomp-endpoint path="/portfolio" />
  11. <websocket:sockjs/>
  12. </websocket:stomp-endpoint>
  13. <websocket:stomp-broker-relay prefix="/topic,/queue" />
  14. </websocket:message-broker>
  15. </beans>

前面配置中的STOMP broker中继是一个Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它建立到代理的TCP连接,将所有消息转发给代理,然后通过客户端的WebSocket会话将从代理接收到的所有消息转发给客户端。本质上,它是一个“中继”,在两个方向上转发消息。

添加io.projectreactor.netty:reactor-netty 和 io.netty:netty-all为TCP连接管理将所有依赖项都设置为netty。

此外,应用程序组件(如HTTP请求处理方法、业务服务等)还可以向代理中继发送消息,如发送消息中所述,以向订阅的WebSocket客户端广播消息。
实际上,代理中继支持健壮且可伸缩的消息广播。

4.10 连接到代理

STOMP代理中继维护到代理的单个“系统”TCP连接。此连接仅用于来自服务器端应用程序的消息,不用于接收消息。您可以为此连接配置STOMP凭据(即STOMP框架登录和密码头)。这在XML名称空间和Java配置中都作为systemLogin和systemPasscode属性公开,默认值为guest和guest。
STOMP broker中继还为每个连接的WebSocket客户端创建单独的TCP连接。您可以配置用于代表客户端创建的所有TCP连接的STOMP凭据。这在XML名称空间和Java配置中都作为clientLogin和clientPasscode属性公开,默认值为guest和guest。

STOMP broker中继始终在代表客户端转发给代理的每个连接帧上设置登录和密码头。因此,WebSocket客户端不需要设置这些头。他们被忽视了。正如身份验证部分所解释的,WebSocket客户端应该转而依赖HTTP身份验证来保护WebSocket端点并建立客户端身份。

STOMP broker中继还通过“系统”TCP连接向message broker发送和接收心跳信号。您可以配置发送和接收心跳的间隔(默认情况下每个间隔10秒)。如果与代理的连接丢失,代理中继将继续尝试每5秒重新连接一次,直到成功。
任何Springbean都可以实现ApplicationListener,以便在与代理的“系统”连接丢失并重新建立时接收通知。例如,当没有活动的“系统”连接时,广播股票报价的股票报价服务可以停止尝试发送消息。
默认情况下,STOMP broker中继始终连接到同一主机和端口,如果连接丢失,则根据需要重新连接。如果希望在每次尝试连接时提供多个地址,可以配置地址供应商,而不是固定主机和端口。以下示例显示了如何执行此操作:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
  4. // ...
  5. @Override
  6. public void configureMessageBroker(MessageBrokerRegistry registry) {
  7. registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
  8. registry.setApplicationDestinationPrefixes("/app");
  9. }
  10. private ReactorNettyTcpClient<byte[]> createTcpClient() {
  11. return new ReactorNettyTcpClient<>(
  12. client -> client.addressSupplier(() -> ... ),
  13. new StompReactorNettyCodec());
  14. }
  15. }

还可以使用virtualHost属性配置STOMP broker中继。此属性的值被设置为每个连接帧的主机头,可能很有用(例如,在云环境中,与TCP连接的实际主机不同于提供基于云的STOMP服务的主机)。

4.11 点作为分隔符

当消息被路由到@MessageMapping方法时,它们会与AntPathMatcher匹配。默认情况下,模式应使用斜杠(/)作为分隔符。这在web应用程序中是一个很好的约定,类似于HTTP URL。但是,如果您更习惯于消息传递约定,可以切换到使用点(.)作为分隔符。
以下示例显示了如何在Java配置中执行此操作:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. // ...
  5. @Override
  6. public void configureMessageBroker(MessageBrokerRegistry registry) {
  7. registry.setPathMatcher(new AntPathMatcher("."));
  8. registry.enableStompBrokerRelay("/queue", "/topic");
  9. registry.setApplicationDestinationPrefixes("/app");
  10. }
  11. }

以下示例显示了与前一示例等效的XML配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
  10. <websocket:stomp-endpoint path="/stomp"/>
  11. <websocket:stomp-broker-relay prefix="/topic,/queue" />
  12. </websocket:message-broker>
  13. <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
  14. <constructor-arg index="0" value="."/>
  15. </bean>
  16. </beans>

之后,控制器可以使用点(.)作为@MessageMapping方法中的分隔符,如下例所示:

  1. @Controller
  2. @MessageMapping("red")
  3. public class RedController {
  4. @MessageMapping("blue.{green}")
  5. public void handleGreen(@DestinationVariable String green) {
  6. // ...
  7. }
  8. }

客户端现在可以向/app/red发送消息至/app/red.blue.green123。
在前面的示例中,我们没有更改“代理中继”上的前缀,因为这些前缀完全依赖于外部消息代理。请参阅您使用的代理的STOMP文档页面,以了解它对目标标头支持哪些约定。
另一方面,“简单代理”确实依赖于配置的PathMatcher,因此,如果切换分隔符,该更改也适用于代理,以及代理将消息中的目标匹配到订阅中的模式的方式。

4.12 验证

每个 STOMP over WebSocket 消息会话都以 HTTP 请求开始。这可以是升级到 WebSocket 的请求(即 WebSocket 握手),或者在 SockJS 回退的情况下,是一系列 SockJS HTTP 传输请求。
许多 Web 应用程序已经具备身份验证和授权来保护 HTTP 请求。通常,用户通过 Spring Security 使用某种机制进行身份验证,例如登录页面、HTTP 基本身份验证或其他方式。经过身份验证的用户的安全上下文保存在 HTTP 会话中,并与同一基于 cookie 的会话中的后续请求相关联。
因此,对于 WebSocket 握手或 SockJS HTTP 传输请求,通常已经有一个经过身份验证的用户可以通过 HttpServletRequest#getUserPrincipal(). Spring 自动将该用户与为他们创建的 WebSocket 或 SockJS 会话相关联,随后,通过用户标头与通过该会话传输的所有 STOMP 消息相关联。
简而言之,一个典型的 Web 应用程序除了已经为安全所做的事情之外,什么都不需要做。用户在 HTTP 请求级别使用安全上下文进行身份验证,该安全上下文通过基于 cookie 的 HTTP 会话(然后与为该用户创建的 WebSocket 或 SockJS 会话相关联)维护,并导致在每次Message流过时标记用户标头应用程序。
STOMP协议在连接框架上有登录和密码头。这些最初是为TCP上的STOMP而设计的,也是需要的。然而,对于STOMP over WebSocket,默认情况下,Spring会忽略STOMP协议级别的身份验证头,并假设用户已经在HTTP传输级别进行了身份验证。期望WebSocket或SockJS会话包含经过身份验证的用户。

4.13 令牌认证

Spring Security OAuth支持基于令牌的安全性,包括JSON Web令牌(JWT)。您可以将其用作Web应用程序中的身份验证机制,包括在WebSocket交互上跺脚,如前一节所述(即,通过基于cookie的会话维护身份)。
同时,基于cookie的会话并不总是最合适的(例如,在不维护服务器端会话的应用程序中,或者在通常使用头进行身份验证的移动应用程序中)。
WebSocket协议RFC 6455“没有规定服务器在WebSocket握手期间对客户端进行身份验证的任何特定方式。”然而,在实践中,浏览器客户端只能使用标准身份验证头(即基本HTTP身份验证)或cookie,而不能(例如)提供自定义头。同样,SockJS JavaScript客户端也不提供通过SockJS传输请求发送HTTP头的方法。见sockjs客户端第196期。相反,它确实允许发送查询参数,您可以使用这些参数来发送令牌,但这有其自身的缺点(例如,令牌可能会无意中与服务器日志中的URL一起记录)。

上述限制适用于基于浏览器的客户端,不适用于基于Spring Java的STOMP客户端,该客户端确实支持通过WebSocket和SockJS请求发送头。

因此,希望避免使用cookie的应用程序可能没有在HTTP协议级别进行身份验证的好方法。与其使用cookie,他们可能更喜欢在STOMP消息传递协议级别使用头进行身份验证。这样做需要两个简单的步骤:
1.使用STOMP客户端在连接时传递身份验证头。
2.使用ChannelInterceptor处理身份验证标头。
下一个示例使用服务器端配置注册自定义身份验证拦截器。请注意,拦截器只需要对CONNECT消息进行身份验证并设置用户头。Spring记录并保存经过身份验证的用户,并将其与同一会话上的后续STOMP消息相关联。以下示例显示如何注册自定义身份验证侦听器:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class MyConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void configureClientInboundChannel(ChannelRegistration registration) {
  6. registration.interceptors(new ChannelInterceptor() {
  7. @Override
  8. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  9. StompHeaderAccessor accessor =
  10. MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  11. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  12. Authentication user = ... ; // access authentication header(s)
  13. accessor.setUser(user);
  14. }
  15. return message;
  16. }
  17. });
  18. }
  19. }

另外,请注意,目前,当您使用Spring Security的消息授权时,您需要确保身份验证ChannelInterceptor配置的顺序早于Spring Security。这最好是通过在自己的WebSocketMessageBrokerConfiger实现中声明自定义拦截器来实现,该拦截器标记为@Order(ordered.HIGHEST_priority+99)。

4.14 批准

Spring Security提供WebSocket子协议授权,它使用ChannelInterceptor根据消息中的用户头对消息进行授权。此外,Spring会话提供WebSocket集成,确保用户的HTTP会话不会在WebSocket会话仍处于活动状态时过期。

4.15 用户目标位置

应用程序可以发送针对特定用户的消息,Spring的STOMP支持可以识别以/user/为前缀的目标位置。例如,客户机可能会订阅/user/queue/position updates目标位置。UserDestinationMessageHandler处理该目标位置,并将其转换为用户会话特有的目标位置(例如/queue/position-updates-user123)。这提供了订阅通用名称目标位置的便利,同时确保与订阅同一目标位置的其他用户不发生冲突,以便每个用户都可以接收唯一的股票头寸更新。

在使用用户目标位置时,必须按照Enable STOMP中所示配置代理和应用程序目标位置前缀,否则代理将处理“/user”前缀的消息,这些消息只应由UserDestinationMessageHandler处理。

在发送端,可以将消息发送到目标位置,例如/user/{username}/queue/position updates,然后由UserDestinationMessageHandler将其转换为一个或多个目标位置,每个与用户关联的会话对应一个目标位置。这使得应用程序中的任何组件都可以发送针对特定用户的消息,而不必知道他们的名字和通用目标位置以外的任何信息。注释和消息传递模板也支持这一点。
消息处理方法可以通过@SendToUser注释(在类级别上也支持共享公共目标位置)向与正在处理的消息相关联的用户发送消息,如下例所示:

  1. @Controller
  2. public class PortfolioController {
  3. @MessageMapping("/trade")
  4. @SendToUser("/queue/position-updates")
  5. public TradeResult executeTrade(Trade trade, Principal principal) {
  6. // ...
  7. return tradeResult;
  8. }
  9. }

如果用户有多个会话,默认情况下,订阅给给定目标的所有会话都是目标会话。但是,有时可能需要只针对发送正在处理的消息的会话。可以通过将“广播”属性设置为false来实现,如下例所示:

  1. @Controller
  2. public class MyController {
  3. @MessageMapping("/action")
  4. public void handleAction() throws Exception{
  5. // raise MyBusinessException here
  6. }
  7. @MessageExceptionHandler
  8. @SendToUser(destinations="/queue/errors", broadcast=false)
  9. public ApplicationError handleException(MyBusinessException exception) {
  10. // ...
  11. return appError;
  12. }
  13. }

4.16 信息的顺序

来自代理的消息将发布到clientOutboundChannel,并从那里写入WebSocket会话。由于通道由ThreadPoolExecutor支持,消息在不同的线程中处理,因此客户端接收到的结果序列可能与发布的确切顺序不匹配。
如果这是一个问题,请启用setPreservePublishOrder标志,如下例所示:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class MyConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. protected void configureMessageBroker(MessageBrokerRegistry registry) {
  6. // ...
  7. registry.setPreservePublishOrder(true);
  8. }
  9. }

以下示例显示了与前一示例等效的XML配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:message-broker preserve-publish-order="true">
  10. <!-- ... -->
  11. </websocket:message-broker>
  12. </beans>

设置该标志后,同一客户端会话中的消息将一次发布一条到clientOutboundChannel,以保证发布顺序。请注意,这会带来很小的性能开销,因此只有在需要时才应该启用它。

4.17 事件

发布了几个ApplicationContext事件,可以通过实现 Spring 的ApplicationListener接口来接收:

  • BrokerAvailabilityEvent:指示代理何时可用或不可用。虽然“简单”代理在启动时立即可用并在应用程序运行时保持可用,但 STOMP“代理中继”可能会失去与全功能代理的连接(例如,如果代理重新启动)。代理中继具有重新连接逻辑,并在它返回时重新建立与代理的“系统”连接。因此,只要状态从已连接变为已断开,反之亦然,就会发布此事件。使用 的组件SimpMessagingTemplate应订阅此事件,并避免在代理不可用时发送消息。无论如何,他们应该准备好在MessageDeliveryException 发送消息时进行处理。
  • SessionConnectEvent:在收到新的 STOMP CONNECT 以指示新的客户端会话开始时发布。该事件包含表示连接的消息,包括会话 ID、用户信息(如果有)和客户端发送的任何自定义标头。这对于跟踪客户端会话很有用。订阅此事件的组件可以使用SimpMessageHeaderAccessor或 包装包含的消息StompMessageHeaderAccessor。
  • SessionConnectedEventSessionConnectEvent:在代理发送 STOMP CONNECTED 帧以响应 CONNECT后不久发布。此时,可以认为 STOMP 会话已完全建立。
  • SessionSubscribeEvent:在收到新的 STOMP SUBSCRIBE 时发布。
  • SessionUnsubscribeEvent:在收到新的 STOMP UNSUBSCRIBE 时发布。
  • SessionDisconnectEvent:在 STOMP 会话结束时发布。DISCONNECT 可能是从客户端发送的,也可能是在 WebSocket 会话关闭时自动生成的。在某些情况下,此事件在每个会话中发布不止一次。组件对于多个断开连接事件应该是幂等的。

    当您使用全功能代理时,如果代理暂时不可用,STOMP“代理中继”会自动重新连接“系统”连接。但是,客户端连接不会自动重新连接。假设启用了心跳,客户端通常会在 10 秒内注意到代理没有响应。客户端需要实现自己的重新连接逻辑。

4.18 拦截

事件为 STOMP 连接的生命周期提供通知,但不是为每个客户端消息提供通知。应用程序还可以注册一个 ChannelInterceptor来拦截任何消息和处理链的任何部分。以下示例显示如何拦截来自客户端的入站消息:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void configureClientInboundChannel(ChannelRegistration registration) {
  6. registration.interceptors(new MyChannelInterceptor());
  7. }
  8. }

自定义ChannelInterceptor可以使用StompHeaderAccessor或SimpMessageHeaderAccessor来访问有关消息的信息,如下例所示:

  1. public class MyChannelInterceptor implements ChannelInterceptor {
  2. @Override
  3. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  4. StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
  5. StompCommand command = accessor.getStompCommand();
  6. // ...
  7. return message;
  8. }
  9. }

应用程序还可以实现ExecutorChannelInterceptor,它是ChannelInterceptor的子接口,在处理消息的线程中具有回调。虽然ChannelInterceptor对发送到通道的每条消息调用一次,但ExecutorChannelInterceptor在订阅来自通道的消息的每个MessageHandler的线程中提供挂钩。
请注意,与前面描述的SessionDisconnectEvent一样,断开连接消息可以来自客户端,也可以在WebSocket会话关闭时自动生成。在某些情况下,拦截器可能会在每个会话中多次拦截此消息。对于多个断开连接事件,组件应该是幂等的。

4.19 STOMP 客户端

Spring提供了STOMP-over-WebSocket客户端和STOMP-over-TCP客户端。
首先,您可以创建和配置WebSocketStompClient,如下例所示:

  1. WebSocketClient webSocketClient = new StandardWebSocketClient();
  2. WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
  3. stompClient.setMessageConverter(new StringMessageConverter());
  4. stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在前面的示例中,您可以将StandardWebSocketClient替换为SockJsClient,因为这也是WebSocketClient的一个实现。SockJsClient可以使用WebSocket或基于HTTP的传输作为回退。有关更多详细信息,请参阅SockJsClient。
接下来,您可以建立连接并为STOMP会话提供处理程序,如下例所示:

  1. String url = "ws://127.0.0.1:8080/endpoint";
  2. StompSessionHandler sessionHandler = new MyStompSessionHandler();
  3. stompClient.connect(url, sessionHandler);

当会话准备好使用时,会通知处理程序,如下例所示:

  1. public class MyStompSessionHandler extends StompSessionHandlerAdapter {
  2. @Override
  3. public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
  4. // ...
  5. }
  6. }

一旦建立了会话,就可以发送任何有效负载,并使用已配置的MessageConverter进行序列化,如下例所示:

  1. session.send("/topic/something", "payload");

你也可以订阅目的地。subscribe方法需要订阅消息的处理程序,并返回可用于取消订阅的订阅句柄。对于每个接收到的消息,处理程序可以指定负载应反序列化到的目标对象类型,如下例所示:

  1. session.subscribe("/topic/something", new StompFrameHandler() {
  2. @Override
  3. public Type getPayloadType(StompHeaders headers) {
  4. return String.class;
  5. }
  6. @Override
  7. public void handleFrame(StompHeaders headers, Object payload) {
  8. // ...
  9. }
  10. });

要启用STOMP heartbeat,您可以使用TaskScheduler配置WebSocketStompClient,并可选地自定义心跳间隔(10秒用于写入非活动,这会导致发送心跳,10秒用于读取非活动,这会关闭连接)。
WebSocketStompClient仅在不活动的情况下,即不发送其他消息时,才会发送心跳信号。在使用外部代理时,这可能会带来挑战,因为具有非代理目标的消息代表活动,但实际上并不转发给代理。在这种情况下,您可以在初始化外部代理时配置TaskScheduler,以确保在仅发送具有非代理目标的消息时,心跳也会转发到代理。

当使用WebSoCKStutpCopyEclipse进行性能测试来模拟来自同一台机器的数千个客户端时,考虑关闭心跳,因为每个连接调度自己的心跳任务,而对于在同一台机器上运行的大量客户端来说,这不是优化的。

STOMP协议还支持接收,其中客户端必须添加一个接收头,服务器在处理发送或订阅后用一个接收帧响应该接收头。为了支持这一点,StompSession提供了setAutoReceipt(布尔值),它会在每个后续发送或订阅事件上添加一个接收头。或者,您也可以手动将收据标题添加到StompHeader。send和subscribe都返回Receiptable的实例,您可以使用该实例注册接收成功和失败回调。对于此功能,必须为客户端配置TaskScheduler和收据过期前的时间(默认为15秒)。
请注意,StompSessionHandler本身是一个StompFrameHandler,除了处理消息异常的handleException回调和处理传输级错误(包括Connection罗斯Exception)的HandletTransportError之外,它还可以处理错误帧。

4.20 WebSocket 范围

每个WebSocket会话都有一个属性映射。该映射作为入站客户端消息的头附加,可以从控制器方法访问,如下例所示:

  1. @Controller
  2. public class MyController {
  3. @MessageMapping("/action")
  4. public void handle(SimpMessageHeaderAccessor headerAccessor) {
  5. Map<String, Object> attrs = headerAccessor.getSessionAttributes();
  6. // ...
  7. }
  8. }

您可以在websocket范围内声明Spring管理的bean。您可以将WebSocket范围的bean注入控制器和在clientInboundChannel上注册的任何通道拦截器。这些人通常是单身,比任何单独的WebSocket会话都要长寿。因此,您需要为WebSocket范围的bean使用范围代理模式,如下例所示:

  1. @Component
  2. @Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
  3. public class MyBean {
  4. @PostConstruct
  5. public void init() {
  6. // Invoked after dependencies injected
  7. }
  8. // ...
  9. @PreDestroy
  10. public void destroy() {
  11. // Invoked when the WebSocket session ends
  12. }
  13. }
  14. @Controller
  15. public class MyController {
  16. private final MyBean myBean;
  17. @Autowired
  18. public MyController(MyBean myBean) {
  19. this.myBean = myBean;
  20. }
  21. @MessageMapping("/action")
  22. public void handle() {
  23. // this.myBean from the current WebSocket session
  24. }
  25. }

与任何自定义范围一样,SpringMyBean在第一次从控制器访问时初始化一个新实例,并将该实例存储在 WebSocket 会话属性中。随后返回相同的实例,直到会话结束。WebSocket 范围的 bean 调用了所有 Spring 生命周期方法,如前面的示例所示。

4.21 表现

在性能方面没有灵丹妙药。影响它的因素很多,包括消息的大小和数量,应用方法是否执行需要阻塞的工作,以及外部因素(如网络速度和其他问题)。本节的目标是提供可用配置选项的概述以及有关如何推理扩展的一些想法。
在消息传递应用程序中,消息通过由线程池支持的异步执行通道传递。配置这样的应用程序需要对通道和消息流有很好的了解。因此,建议查看Flow of Messages
clientInboundChannel显而易见的起点是配置支持 clientOutboundChannel. 默认情况下,两者都配置为可用处理器数量的两倍。
如果注释方法中的消息处理主要受 CPU 限制,则线程数clientInboundChannel应保持接近处理器数。如果他们所做的工作更受 IO 限制,并且需要在数据库或其他外部系统上阻塞或等待,则可能需要增加线程池大小。

ThreadPoolExecutor具有三个重要属性:核心线程池大小、最大线程池大小和队列存储没有可用线程的任务的容量。 一个常见的混淆点是配置核心池大小(例如,10)和最大池大小(例如,20)会导致线程池具有 10 到 20 个线程。事实上,如果将容量保留为其默认值 Integer.MAX_VALUE,则线程池的大小永远不会超过核心池大小,因为所有额外的任务都会排队。 请参阅 javadocThreadPoolExecutor以了解这些属性如何工作并了解各种排队策略。

另一方面clientOutboundChannel,这完全是关于向 WebSocket 客户端发送消息。如果客户端位于快速网络上,则线程数应保持接近可用处理器的数量。如果它们速度慢或带宽低,它们会花费更长的时间来消费消息并给线程池带来负担。因此,增加线程池大小变得很有必要。
虽然clientInboundChannel可以预测的工作负载 - 毕竟,它是基于应用程序所做的 - 如何配置“clientOutboundChannel”更难,因为它基于应用程序无法控制的因素。出于这个原因,两个附加属性与消息的发送有关:sendTimeLimit 和sendBufferSizeLimit。您可以使用这些方法来配置允许发送多长时间以及在向客户端发送消息时可以缓冲多少数据。
一般的想法是,在任何给定时间,只有一个线程可以用于发送到客户端。同时,所有其他消息都会被缓冲,您可以使用这些属性来决定允许发送消息需要多长时间以及同时可以缓冲多少数据。有关重要的附加详细信息,请参阅 XML 模式的 javadoc 和文档。
以下示例显示了一种可能的配置:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  6. registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
  7. }
  8. // ...
  9. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:message-broker>
  10. <websocket:transport send-timeout="15000" send-buffer-size="524288" />
  11. <!-- ... -->
  12. </websocket:message-broker>
  13. </beans>

您还可以使用前面显示的 WebSocket 传输配置来配置传入 STOMP 消息的最大允许大小。理论上,WebSocket 消息的大小几乎可以不受限制。在实践中,WebSocket 服务器施加了限制——例如,Tomcat 为 8K,Jetty 为 64K。出于这个原因,STOMP 客户端(例如 JavaScript webstomp-client 等)在 16K 边界处拆分较大的 STOMP 消息,并将它们作为多个 WebSocket 消息发送,这需要服务器进行缓冲和重新组装。
Spring 的 STOMP-over-WebSocket 支持做到了这一点,因此应用程序可以配置 STOMP 消息的最大大小,而不管 WebSocket 服务器特定的消息大小。请记住,WebSocket 消息大小会在必要时自动调整,以确保它们至少可以承载 16K 的 WebSocket 消息。
以下示例显示了一种可能的配置:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  6. registration.setMessageSizeLimit(128 * 1024);
  7. }
  8. // ...
  9. }

以下示例显示了与前面示例等效的 XML 配置:

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:websocket="http://www.springframework.org/schema/websocket"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. https://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/websocket
  8. https://www.springframework.org/schema/websocket/spring-websocket.xsd">
  9. <websocket:message-broker>
  10. <websocket:transport message-size="131072" />
  11. <!-- ... -->
  12. </websocket:message-broker>
  13. </beans>

关于扩展的一个重点涉及使用多个应用程序实例。目前,您无法使用简单的代理来做到这一点。但是,当您使用功能齐全的代理(例如 RabbitMQ)时,每个应用程序实例都连接到代理程序,并且从一个应用程序实例广播的消息可以通过代理程序广播到通过任何其他应用程序实例连接的 WebSocket 客户端。

4.22 监控

当您使用@EnableWebSocketMessageBrokeror时,关键基础架构组件会自动收集统计数据和计数器,这些数据和计数器可提供对应用程序内部状态的重要洞察。该配置还声明了一个 bean 类型WebSocketMessageBrokerStats,该 bean 将所有可用信息收集在一个地方,默认情况下INFO每 30 分钟在该级别记录一次。这个 bean 可以通过 Spring 导出到 JMX MBeanExporter以便在运行时查看(例如,通过 JDK jconsole)。以下列表总结了可用信息:
客户端 WebSocket 会话

  • Current

指示当前有多少客户端会话,计数进一步细分为 WebSocket 与 HTTP 流和轮询 SockJS 会话。

  • Total

指示已建立的总会话数。

  • Abnormally Closed
    • Connect Failures

已建立但在 60 秒内未收到任何消息后关闭的会话。这通常表示代理或网络问题。

  • Send Limit Exceeded

会话在超过配置的发送超时或发送缓冲区限制后关闭,这可能发生在慢速客户端(请参阅上一节)。

  • Transport Errors

传输错误后关闭会话,例如无法读取或写入 WebSocket 连接或 HTTP 请求或响应。

  • STOMP 框架

处理的 CONNECT、CONNECTED 和 DISCONNECT 帧的总数,表示在 STOMP 级别上连接了多少客户端。请注意,当会话异常关闭或客户端在未发送 DISCONNECT 帧的情况下关闭时,DISCONNECT 计数可能会较低。
STOMP 代理转发

  • TCP 连接

指示代表客户端 WebSocket 会话建立到代理的 TCP 连接数。这应该等于客户端 WebSocket 会话的数量 + 1 个额外的共享“系统”连接,用于从应用程序内部发送消息。

  • STOMP 框架

代表客户端转发到代理或从代理接收的 CONNECT、CONNECTED 和 DISCONNECT 帧的总数。请注意,无论客户端 WebSocket 会话如何关闭,都会向代理发送 DISCONNECT 帧。因此,较低的 DISCONNECT 帧计数表明代理正在主动关闭连接(可能是由于心跳未及时到达、输入帧无效或其他问题)。
客户入站渠道
来自支持 的线程池的统计信息clientInboundChannel ,可以深入了解传入消息处理的健康状况。在这里排队的任务表明应用程序可能太慢而无法处理消息。如果存在 I/O 绑定任务(例如,慢速数据库查询、对第三方 REST API 的 HTTP 请求等),请考虑增加线程池大小。
客户外呼渠道
来自支持 的线程池的统计信息clientOutboundChannel ,可以深入了解向客户端广播消息的健康状况。在这里排队的任务表明客户端太慢而无法消费消息。解决此问题的一种方法是增加线程池大小以适应预期的并发慢客户端数量。另一种选择是减少发送超时和发送缓冲区大小限制(参见上一节)。
SockJS 任务调度器
来自用于发送心跳的 SockJS 任务调度程序的线程池的统计信息。请注意,当在 STOMP 级别协商心跳时,SockJS 心跳被禁用。

4.23 测试

当您使用 Spring 的 STOMP-over-WebSocket 支持时,有两种主要方法可以测试应用程序。首先是编写服务器端测试来验证控制器的功能及其带注释的消息处理方法。第二个是编写涉及运行客户端和服务器的完整端到端测试。
这两种方法并不相互排斥。相反,每一个都在整体测试策略中占有一席之地。服务器端测试更专注,更容易编写和维护。另一方面,端到端集成测试更完整,测试更多,但它们也更多地涉及编写和维护。
服务器端测试的最简单形式是编写控制器单元测试。然而,这还不够有用,因为控制器所做的大部分工作都取决于它的注释。纯单元测试根本无法测试。
理想情况下,被测控制器应该在运行时被调用,就像使用 Spring MVC 测试框架来测试处理 HTTP 请求的控制器的方法一样——也就是说,不运行 Servlet 容器,而是依赖 Spring 框架来调用带注释的控制器。与 Spring MVC 测试一样,这里有两种可能的选择,或者使用“基于上下文”或使用“独立”设置:

  • 借助 Spring TestContext 框架加载实际的 Spring 配置,clientInboundChannel作为测试字段注入,并使用它发送消息以由控制器方法处理。
  • 手动设置调用控制器(即SimpAnnotationMethodMessageHandler)所需的最小 Spring 框架基础结构并将控制器的消息直接传递给它。

在股票投资组合示例应用程序的测试中演示了这两种设置场景 。
第二种方法是创建端到端集成测试。为此,您需要以嵌入式模式运行 WebSocket 服务器并作为 WebSocket 客户端连接到它,该客户端发送包含 STOMP 帧的 WebSocket 消息。股票投资组合示例应用程序的测试还通过使用 Tomcat 作为嵌入式 WebSocket 服务器和用于测试目的的简单 STOMP 客户端来演示这种方法。

参考资料

spring官网(springMVC)
webflux版本