User Destinations

应用程序可以发送针对特定用户的消息,Spring 的 STOMP 支持为此目的识别以 /user/为前缀的目标。例如,一个客户端可能会订阅 /user/queue/position-updates目的地。UserDestinationMessageHandler 处理这个目的地,并将其转换为用户会话的唯一目的地(如 /queue/position-updates-user123)。这为订阅一个通用命名的目的地提供了便利,同时,确保不会与订阅同一目的地的其他用户发生冲突,以便每个用户都能收到独特的股票位置更新。

:::warning 当使用用户目的地时,重要的是配置消息代理(broker)和应用程序的目的地前缀,如 启用 STOMP 中所示,否则消息代理将处理 /user前缀的消息,而这些消息只应由 UserDestinationMessageHandler 处理。 :::

在发送方面,消息可以被发送到一个目的地,如 /user/{username}/queue/position-updates,这又被UserDestinationMessageHandler 翻译成一个或多个目的地,一个用于与用户相关的会话。这让应用程序中的任何组件都可以发送针对特定用户的消息,而不一定要知道他们的名字和通用目的地。这也是通过一个注解和一个消息模板来支持的。

一个消息处理方法可以通过 @SendToUser注解(也支持在类级别上共享一个共同的目的地)向与被处理的消息相关的用户发送消息,如下例所示:

  1. @Controller
  2. public class PortfolioController {
  3. @MessageMapping("/trade")
  4. @SendToUser("/queue/position-updates")
  5. public TradeResult executeTrade(Trade trade, Principal principal) {
  6. // ...
  7. return tradeResult;
  8. }
  9. }

如果用户有一个以上的会话,默认情况下,所有订阅给定目的地的会话都是目标。然而,有时可能需要只针对发送被处理消息的会话。你可以通过将广播属性设置为 false 来做到这一点,正如下面的例子所示:

  1. @Controller
  2. public class MyController {
  3. @MessageMapping("/action")
  4. public void handleAction() throws Exception{
  5. // 在此引发 MyBusinessException
  6. }
  7. @MessageExceptionHandler
  8. @SendToUser(destinations="/queue/errors", broadcast=false)
  9. public ApplicationError handleException(MyBusinessException exception) {
  10. // ...
  11. return appError;
  12. }
  13. }

:::info 虽然用户目的地通常意味着有一个 经过认证的用户,但这并不是严格的要求。未与认证用户相关联的 WebSocket 会话可以订阅用户目的地。在这种情况下,@SendToUser注解的行为与 broadcast=false 时完全相同(即只针对发送被处理消息的会话)。 :::

你可以从任何应用组件向用户目的地发送消息,例如,通过注入由 Java 配置或 XML 命名空间创建的 SimpMessagingTemplate。(如果需要用 @Qualifier来限定,bean 的名字是 brokerMessagingTemplate)。下面的例子显示了如何做到这一点:

  1. @Service
  2. public class TradeServiceImpl implements TradeService {
  3. private final SimpMessagingTemplate messagingTemplate;
  4. @Autowired
  5. public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
  6. this.messagingTemplate = messagingTemplate;
  7. }
  8. // ...
  9. public void afterTradeExecuted(Trade trade) {
  10. this.messagingTemplate.convertAndSendToUser(
  11. trade.getUserName(), "/queue/position-updates", trade.getResult());
  12. }
  13. }

:::info 当您将 用户目的地 外部消息代理 一起使用时,您应该查看关于如何管理非活动队列的代理文档,以便在用户会话结束时,所有独特的用户队列都被删除。例如,当您使用诸如 /exchange/amq.direct/position-updates 等目的地时,RabbitMQ 会创建自动删除队列。因此,在这种情况下,客户端可以订阅到 /user/exchange/amq.direct/position-updates。同样地,ActiveMQ 也有清除非活动目的地的 配置选项。 :::

在一个多应用服务器的情况下,一个用户目的地可能会因为用户连接到一个不同的服务器而保持未解决。在这种情况下,你可以配置一个目的地来广播未解决的消息,以便其他服务器有机会尝试。这可以通过 Java 配置中 MessageBrokerRegistry 的userDestinationBroadcast 属性和 XML 中 message-broker 元素的 user-destination-broadcast 属性完成。

一个例子

前面的例子 基础上进行

用户队列持久化方式

在配置中开启 /user 用户目的地处理配置。

  1. package cn.mrcode.study.springdocsread.websocket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageChannel;
  6. import org.springframework.messaging.simp.config.ChannelRegistration;
  7. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  8. import org.springframework.messaging.simp.stomp.StompCommand;
  9. import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
  10. import org.springframework.messaging.support.ChannelInterceptor;
  11. import org.springframework.messaging.support.MessageHeaderAccessor;
  12. import org.springframework.util.StringUtils;
  13. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  14. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  15. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  16. import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
  17. import java.security.Principal;
  18. /**
  19. * @author mrcode
  20. */
  21. @Configuration
  22. @EnableWebSocketMessageBroker
  23. public class MyWebSocketConfig implements WebSocketMessageBrokerConfigurer {
  24. @Override
  25. public void registerStompEndpoints(StompEndpointRegistry registry) {
  26. // portfolio 是 WebSocket(或 SockJS)的端点的 HTTP URL。客户端需要连接以进行 WebSocket 握手。
  27. registry.addEndpoint("/portfolio")
  28. .setAllowedOriginPatterns("*")
  29. .withSockJS();
  30. }
  31. @Override
  32. public void configureMessageBroker(MessageBrokerRegistry config) {
  33. // 目标标头以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法。
  34. config.setApplicationDestinationPrefixes("/app");
  35. /*
  36. 启用用户目的地,它的原理如下(这个在源码的注释上有说明):
  37. 配置用于标识用户目的地的前缀。用户目的地为用户提供了订阅其会话唯一的队列名称以及其他人将消息发送到那些唯一的、特定于用户的队列的能力。
  38. 例如,当用户尝试订阅“/user/queue/position-updates”时,目的地可能会被转换为“/queue/position-updatesi9oqdfzo”,从而产生一个唯一的队列名称,不会与任何其他尝试订阅的用户发生冲突相同。随后,当消息发送到“/user/{username}/queue/position-updates”时,目的地被转换为“/queue/position-updatesi9oqdfzo”。
  39. 用于标识此类目的地的默认前缀是“/user/”。
  40. */
  41. config.setUserDestinationPrefix("/user");
  42. // 使用内置的消息代理进行订阅和广播,并且 将目标标头以 /topic 或 /queue 开头的消息路由到代理。
  43. config.enableStompBrokerRelay("/topic", "/queue")
  44. // 默认使用 ReactorNettyTcpClient
  45. // .setTcpClient()
  46. // 所以可以直接设置相关的属性
  47. .setRelayHost("127.0.0.1")
  48. .setRelayPort(61613)
  49. ;
  50. }
  51. /**
  52. * 入栈 通道配置
  53. *
  54. * @param registration
  55. */
  56. @Override
  57. public void configureClientInboundChannel(ChannelRegistration registration) {
  58. // 添加拦截器
  59. registration.interceptors(new ChannelInterceptor() {
  60. @Override
  61. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  62. // 从消息中获取与 stomp 相关的请求头
  63. StompHeaderAccessor accessor =
  64. MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  65. System.out.println(accessor.getCommand());
  66. // 如果当前的消息命令类型是 链接 类型,则处理 认证相关的信息
  67. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  68. // 获取登录用户 和 密码 header
  69. final String login = accessor.getLogin();
  70. final String passcode = accessor.getPasscode();
  71. // 如果没有设置 login 头就抛出异常
  72. if (!StringUtils.hasLength(login)) {
  73. throw new RuntimeException("必须设置用户名");
  74. }
  75. // 在这里你可以查询数据库之类的方式验证
  76. // 注意:这个 Authentication 是 security 里面的类
  77. // Authentication user = ... ; // access authentication header(s)
  78. // 但是这里要求的是 java.security.Principal 就可以,所以我们可以自己实现一个
  79. accessor.setUser(new Principal() {
  80. @Override
  81. public String getName() {
  82. return login;
  83. }
  84. });
  85. }
  86. return message;
  87. }
  88. });
  89. }
  90. /**
  91. * spring 为了支持每种容器自己的 websocket 升级策略,抽象了 RequestUpgradeStrategy,
  92. * <p>对 tomcat 提供了 TomcatRequestUpgradeStrategy 策略</p>
  93. * 如果不申明这个,就会在启动的时候抛出异常:No suitable default RequestUpgradeStrategy found
  94. */
  95. @Bean
  96. public TomcatRequestUpgradeStrategy tomcatRequestUpgradeStrategy() {
  97. return new TomcatRequestUpgradeStrategy();
  98. }
  99. }

然后编写一个接受请求更新,并将结果转发给发起请求用户的 @MessageMapping 方法

  1. package cn.mrcode.study.springdocsread.websocket;
  2. import org.springframework.messaging.handler.annotation.MessageMapping;
  3. import org.springframework.messaging.simp.annotation.SendToUser;
  4. import org.springframework.messaging.simp.annotation.SubscribeMapping;
  5. import org.springframework.stereotype.Controller;
  6. import java.security.Principal;
  7. /**
  8. * @author mrcode
  9. */
  10. @Controller
  11. public class StompController {
  12. /**
  13. * @param greeting
  14. * @return 返回值是广播给所有人
  15. */
  16. // 需要注意的是:客户端需要发送消息到 /app/greeting
  17. // 响应的消息,会默认广播到 /topic/greeting 上,只要订阅了 /topic/greeting 的订阅者都能收到
  18. @MessageMapping("/greeting")
  19. public String handle(String greeting) {
  20. return "[" + getTimestamp() + ": " + greeting;
  21. }
  22. private String getTimestamp() {
  23. return System.currentTimeMillis() + "";
  24. }
  25. /**
  26. * @return 返回值只返回给订阅的人;
  27. */
  28. // 需要注意的是:前端需要订阅 /app/greeting2
  29. // 也就是说,只要有订阅 /app/greeting2,订阅成功后,该订阅者就会收到这里返回的消息
  30. @SubscribeMapping("/greeting2")
  31. public String handle2() {
  32. return "[ 单个消息" + getTimestamp() + ": ";
  33. }
  34. // 每个用户执行更新操作,然后将更新的结果反馈给该用户
  35. // 首先客户端需要订阅 /user/queue/trade
  36. // 客户端发送消息到到 /app/trade 更新消息
  37. // 服务器随后会将更新后的结果推送给 /user/{username}/queue/trade , 该用户就能获取到消息了
  38. @MessageMapping("/trade")
  39. @SendToUser("/queue/trade")
  40. public String executeTrade(String mesg, Principal principal) {
  41. // ...
  42. return "消息已经处理完成:" + mesg;
  43. }
  44. }

前端页面订阅和发起消息

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>STOMP</title>
  6. </head>
  7. <body>
  8. </body>
  9. <script type="text/javascript" src="/node_modules/webstomp-client/dist/webstomp.min.js"></script>
  10. <script type="text/javascript"
  11. src="https://cdnjs.loli.net/ajax/libs/sockjs-client/1.6.0/sockjs.js"></script>
  12. <script>
  13. // 这里使用 sockJs 库链接
  14. var socket = new SockJS("http://localhost:8080/portfolio");
  15. // 文章上说可以使用 WebSocket 链接。 实际上我这里测试不可以,会报错
  16. // var socket = new WebSocket("ws://localhost:8080/portfolio");
  17. var stompClient = webstomp.over(socket);
  18. // 链接上 服务器时
  19. stompClient.connect(
  20. // 头信息
  21. {
  22. login: 'user1',
  23. passcode: '123456',
  24. },
  25. // 链接成功回调函数
  26. function (frame) {
  27. console.log(frame)
  28. // // 订阅消息
  29. // stompClient.subscribe("/topic/greeting", msg => {
  30. // console.log("收到订阅的消息广播:" + msg.body)
  31. // })
  32. //
  33. // // 订阅消息
  34. // stompClient.subscribe("/app/greeting2", msg => {
  35. // console.log("收到初始化的订阅消息:" + msg.body)
  36. // })
  37. //
  38. // // 链接上服务器时,像服务器发送一个消息
  39. // stompClient.send("/app/greeting", "我的第一个消息")
  40. // 订阅消息
  41. // stompClient.subscribe("/user/exchange/amq.direct/position-updates", msg => {
  42. // console.log("收到订阅的 /user/queue/trade 消息:" + msg.body)
  43. // })
  44. // 这里为了更好的看到控制台的信息,将上面的测试都注释掉了
  45. stompClient.subscribe("/user/queue/trade", msg => {
  46. console.log("收到订阅的 /user/queue/trade 消息:" + msg.body)
  47. })
  48. // 发送更新消息
  49. stompClient.send("/app/trade", "更新消息")
  50. },
  51. // 链接失败函数
  52. function (frame) {
  53. console.log("链接失败:", frame)
  54. // 如果是一个 stomp 帧,这判定 command ,如果没有这可能是一个 CloseEvent 对象
  55. if (frame.command && frame.command == 'ERROR') {
  56. console.log("链接失败原因:", frame.headers.message)
  57. }
  58. }
  59. )
  60. </script>
  61. </html>

然后启动测试,打开该 html 页面,控制台可以看到有如下的输出
image.png
如果你将该 html 复制一份,并将 login 修改成 user2 打开后,就能看到只有当前的用户能接受到这个对应的订阅消息,其他用户是看不到的。

然后再来看看 RabbitMQ 里面发生了什么
image.png
和本章文档说的一样,生成了一个单独的队列,不过它不是按照用户名生成的,而是按照 simpSessionId(可以在后端的用户登录认证地方,也就是 StompHeaderAccessor 中的 headers 中看到这个值)

所以这里我就没有明白为什么是 session 还需要搞一个持久化的队列呢?难道可以自定义 sessionId 的生成吗?不然这个持久化的队列貌似没有什么作用,每次刷新页面生成的 sessionId 都是不一样的

用户临时队列

前面看到了,使用 /user/queue 方式会生成一个持久化的队列。目前没有搞明白 sessionId 的情况下,持久化队列不会自动删除,刷新一次页面就会生成一次,这就严重的不太方便。

可以使用 /exchange方式生成临时队列,先来看如何配置,再讲原理

  1. package cn.mrcode.study.springdocsread.websocket;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageChannel;
  6. import org.springframework.messaging.simp.config.ChannelRegistration;
  7. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  8. import org.springframework.messaging.simp.stomp.StompCommand;
  9. import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
  10. import org.springframework.messaging.support.ChannelInterceptor;
  11. import org.springframework.messaging.support.MessageHeaderAccessor;
  12. import org.springframework.util.StringUtils;
  13. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  14. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  15. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  16. import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
  17. import java.security.Principal;
  18. /**
  19. * @author mrcode
  20. */
  21. @Configuration
  22. @EnableWebSocketMessageBroker
  23. public class MyWebSocketConfig implements WebSocketMessageBrokerConfigurer {
  24. @Override
  25. public void registerStompEndpoints(StompEndpointRegistry registry) {
  26. // portfolio 是 WebSocket(或 SockJS)的端点的 HTTP URL。客户端需要连接以进行 WebSocket 握手。
  27. registry.addEndpoint("/portfolio")
  28. .setAllowedOriginPatterns("*")
  29. .withSockJS();
  30. }
  31. @Override
  32. public void configureMessageBroker(MessageBrokerRegistry config) {
  33. // 目标标头以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法。
  34. config.setApplicationDestinationPrefixes("/app");
  35. /*
  36. 启用用户目的地,它的原理如下(这个在源码的注释上有说明):
  37. 配置用于标识用户目的地的前缀。用户目的地为用户提供了订阅其会话唯一的队列名称以及其他人将消息发送到那些唯一的、特定于用户的队列的能力。
  38. 例如,当用户尝试订阅“/user/queue/position-updates”时,目的地可能会被转换为“/queue/position-updatesi9oqdfzo”,从而产生一个唯一的队列名称,不会与任何其他尝试订阅的用户发生冲突相同。随后,当消息发送到“/user/{username}/queue/position-updates”时,目的地被转换为“/queue/position-updatesi9oqdfzo”。
  39. 用于标识此类目的地的默认前缀是“/user/”。
  40. */
  41. config.setUserDestinationPrefix("/user");
  42. // 使用内置的消息代理进行订阅和广播,并且 将目标标头以 /topic 或 /queue 开头的消息路由到代理。
  43. // 这里增加 /exchange 的前缀
  44. config.enableStompBrokerRelay("/topic", "/queue", "/exchange")
  45. // 默认使用 ReactorNettyTcpClient
  46. // .setTcpClient()
  47. // 所以可以直接设置相关的属性
  48. .setRelayHost("127.0.0.1")
  49. .setRelayPort(61613)
  50. ;
  51. }
  52. /**
  53. * 入栈 通道配置
  54. *
  55. * @param registration
  56. */
  57. @Override
  58. public void configureClientInboundChannel(ChannelRegistration registration) {
  59. // 添加拦截器
  60. registration.interceptors(new ChannelInterceptor() {
  61. @Override
  62. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  63. // 从消息中获取与 stomp 相关的请求头
  64. StompHeaderAccessor accessor =
  65. MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  66. System.out.println(accessor.getCommand());
  67. // 如果当前的消息命令类型是 链接 类型,则处理 认证相关的信息
  68. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  69. // 获取登录用户 和 密码 header
  70. final String login = accessor.getLogin();
  71. final String passcode = accessor.getPasscode();
  72. // 如果没有设置 login 头就抛出异常
  73. if (!StringUtils.hasLength(login)) {
  74. throw new RuntimeException("必须设置用户名");
  75. }
  76. // 在这里你可以查询数据库之类的方式验证
  77. // 注意:这个 Authentication 是 security 里面的类
  78. // Authentication user = ... ; // access authentication header(s)
  79. // 但是这里要求的是 java.security.Principal 就可以,所以我们可以自己实现一个
  80. accessor.setUser(new Principal() {
  81. @Override
  82. public String getName() {
  83. return login;
  84. }
  85. });
  86. }
  87. return message;
  88. }
  89. });
  90. }
  91. /**
  92. * spring 为了支持每种容器自己的 websocket 升级策略,抽象了 RequestUpgradeStrategy,
  93. * <p>对 tomcat 提供了 TomcatRequestUpgradeStrategy 策略</p>
  94. * 如果不申明这个,就会在启动的时候抛出异常:No suitable default RequestUpgradeStrategy found
  95. */
  96. @Bean
  97. public TomcatRequestUpgradeStrategy tomcatRequestUpgradeStrategy() {
  98. return new TomcatRequestUpgradeStrategy();
  99. }
  100. }

前端页面

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>STOMP</title>
  6. </head>
  7. <body>
  8. </body>
  9. <script type="text/javascript" src="/node_modules/webstomp-client/dist/webstomp.min.js"></script>
  10. <script type="text/javascript"
  11. src="https://cdnjs.loli.net/ajax/libs/sockjs-client/1.6.0/sockjs.js"></script>
  12. <script>
  13. // 这里使用 sockJs 库链接
  14. var socket = new SockJS("http://localhost:8080/portfolio");
  15. // 文章上说可以使用 WebSocket 链接。 实际上我这里测试不可以,会报错
  16. // var socket = new WebSocket("ws://localhost:8080/portfolio");
  17. var stompClient = webstomp.over(socket);
  18. // 链接上 服务器时
  19. stompClient.connect(
  20. // 头信息
  21. {
  22. login: 'user1',
  23. passcode: '123456',
  24. },
  25. // 链接成功回调函数
  26. function (frame) {
  27. console.log(frame)
  28. // // 订阅消息
  29. // stompClient.subscribe("/topic/greeting", msg => {
  30. // console.log("收到订阅的消息广播:" + msg.body)
  31. // })
  32. //
  33. // // 订阅消息
  34. // stompClient.subscribe("/app/greeting2", msg => {
  35. // console.log("收到初始化的订阅消息:" + msg.body)
  36. // })
  37. //
  38. // // 链接上服务器时,像服务器发送一个消息
  39. // stompClient.send("/app/greeting", "我的第一个消息")
  40. // 订阅消息
  41. // stompClient.subscribe("/user/exchange/amq.direct/position-updates", msg => {
  42. // console.log("收到订阅的 /user/queue/trade 消息:" + msg.body)
  43. // })
  44. // stompClient.subscribe("/user/queue/trade", msg => {
  45. // console.log("收到订阅的 /user/queue/trade 消息:" + msg.body)
  46. // })
  47. // // 发送更新消息
  48. // stompClient.send("/app/trade", "更新消息")
  49. stompClient.subscribe("/user/exchange/amq.direct/trade", msg => {
  50. console.log("收到订阅的 /user/exchange/amq.direct/trade 消息:" + msg.body)
  51. })
  52. // 这里不延迟一会发送消息,可能会导致订阅的临时队列还没有创建完成,从而表现出来是发送了消息,但是没有接受到消息的情况
  53. setTimeout(()=>{
  54. // 发送更新消息
  55. stompClient.send("/app/trade", "更新消息")
  56. },500)
  57. },
  58. // 链接失败函数
  59. function (frame) {
  60. console.log("链接失败:", frame)
  61. // 如果是一个 stomp 帧,这判定 command ,如果没有这可能是一个 CloseEvent 对象
  62. if (frame.command && frame.command == 'ERROR') {
  63. console.log("链接失败原因:", frame.headers.message)
  64. }
  65. }
  66. )
  67. </script>
  68. </html>

看看测试的 html 控制台
image.png
然后再看看 RabbitMQ 中的信息
image.png
与前面的持久化队列方式相差太多了。 这里队列名称里面都没有 trade 了。而且是临时队列了。

解密用户临时队列的原理

RabbitMQ STOMP 官方文档 中有如下的描述
image.png
可以看到,目的地前缀支持上面的好多种,所以我们需要在 enableStompBrokerRelay 中开启对这个前缀的支持,否则就不会转发到对应的消息代理上

  1. config.enableStompBrokerRelay("/topic", "/queue", "/exchange")

然后再来看 exchange/amq.direct/trade这个地址的含义, AMQP 的语法如下

  1. /exchange/exchange_name [/routing_key]

所以对于我们的地址来说就很明显了:

  • /exchange 消息代理固定前缀
  • amq.direct这个是 RabbitMQ 的默认 Exchange ,可以在 exchanges 页面看到,如下图所示

image.png

  • trad :就是 routing_key 了

明白了路径的含义之后,你一定在想,这也看不出来和是如何与一个用户的会话 ID 对应起来的。这个只需要点开这个临时队列就清楚了
image.png
在这个临时队列绑定到 amq.direct exchange 的时候,将 routing_key 设置成了之前持久化队列的名称。 这下你就真的明白了他们的关系是如何对应的了