一、发送数据携带用户ID

携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信
需要定义一个实体实现Principal,实现getName()方法

  1. @Getter
  2. @Setter
  3. public class User implements Principal {
  4. private String username;
  5. private String password;
  6. private String role;
  7. private List<Url> urls;
  8. @Override
  9. public String getName() {
  10. return username;
  11. }
  12. }

定义用户拦截器做认证,并生成User,注入StompHeaderAccessor

  1. /**
  2. *用户拦截器
  3. **/
  4. public class UserInterceptor implements ChannelInterceptor {
  5. @Override
  6. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  7. StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  8. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  9. Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
  10. if (raw instanceof Map) {
  11. //这里就是token
  12. Object name = ((Map) raw).get(Constants.TOKEN_KEY);
  13. if (name instanceof LinkedList) {
  14. // 设置当前访问器的认证用户
  15. String token = ((LinkedList) name).get(0).toString();
  16. String username = null;
  17. try {
  18. Map<String, Claim> claimMap = JWTUtils.verifyToken(token);
  19. username = claimMap.get("username").asString();
  20. if(username == null){
  21. throw new RuntimeException("websocket认证失败");
  22. }
  23. } catch (UnsupportedEncodingException e) {
  24. e.printStackTrace();
  25. throw new RuntimeException("websocket认证失败", e);
  26. } catch (ValidTokenException e) {
  27. e.printStackTrace();
  28. throw new RuntimeException("websocket认证失败", e);
  29. }
  30. User user = new User();
  31. user.setUsername(username);
  32. accessor.setUser(user);
  33. // User user = new User();
  34. // user.setUsername("lalala");
  35. // accessor.setUser(user);
  36. }
  37. }
  38. }
  39. return message;
  40. }
  41. @Override
  42. public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
  43. }
  44. @Override
  45. public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
  46. }
  47. @Override
  48. public boolean preReceive(MessageChannel channel) {
  49. return false;
  50. }
  51. @Override
  52. public Message<?> postReceive(Message<?> message, MessageChannel channel) {
  53. return null;
  54. }
  55. @Override
  56. public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
  57. }
  58. }
  1. /*将客户端渠道拦截器加入spring ioc容器*/
  2. @Bean
  3. public UserInterceptor createUserInterceptor() {
  4. return new UserInterceptor();
  5. }

服务端

  1. /**
  2. * 接收用户信息
  3. * */
  4. @MessageMapping(value = "/principal")
  5. public void test(Principal principal) {
  6. log.info("当前在线人数:" + userRegistry.getUserCount());
  7. int i = 1;
  8. for (SimpUser user : userRegistry.getUsers()) {
  9. log.info("用户" + i++ + "---" + user);
  10. }
  11. //发送消息给指定用户
  12. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  13. }

客户端:

  1. /*
  2. * 发送用户信息
  3. */
  4. function send0() {
  5. stompClient.send("/app/principal", {},
  6. {});
  7. }

二、发送JSON数据体

服务端可以直接在函数中注入JavaBean或者Map,List或者String接收
服务端:

  1. /*点对点通信*/
  2. @MessageMapping(value = "/P2P")
  3. public void templateTest(Principal principal, Map<String,String> data) {
  4. log.info("当前在线人数:" + userRegistry.getUserCount());
  5. int i = 1;
  6. for (SimpUser user : userRegistry.getUsers()) {
  7. log.info("用户" + i++ + "---" + user);
  8. }
  9. //发送消息给指定用户
  10. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  11. }

客户端:

  1. /**
  2. * 发送JSON数据体
  3. * */
  4. function send() {
  5. stompClient.send("/app/P2P", {},
  6. JSON.stringify({ 'name': 'test' }));
  7. }

三、将参数携带到发送请求的URL路径中

使用@DestinationVariable注解,类似SpringMVC的@PathVirable
服务端:

  1. /**
  2. * 接收路径参数
  3. * */
  4. @MessageMapping(value = "/path/{name}/{company}")
  5. public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
  6. log.info("当前在线人数:" + userRegistry.getUserCount());
  7. int i = 1;
  8. for (SimpUser user : userRegistry.getUsers()) {
  9. log.info("用户" + i++ + "---" + user);
  10. }
  11. //发送消息给指定用户
  12. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  13. }

客户端:

  1. /*
  2. * 发送路径参数
  3. */
  4. function send2() {
  5. stompClient.send("/app/path/zhangsan/XXX公司", {},
  6. {});
  7. }

四、发送header

使用@Header注解
服务端:

  1. /**
  2. * 接收header参数
  3. * */
  4. @MessageMapping(value = "/header")
  5. public void headerTest(Principal principal, @Header String one, @Header String two) {
  6. log.info("当前在线人数:" + userRegistry.getUserCount());
  7. int i = 1;
  8. for (SimpUser user : userRegistry.getUsers()) {
  9. log.info("用户" + i++ + "---" + user);
  10. }
  11. //发送消息给指定用户
  12. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  13. }

客户端:

  1. /**
  2. * 发送header参数
  3. * */
  4. function send3() {
  5. stompClient.send("/app/header", {"one":"lalala", "two":"中国"},
  6. {});
  7. }

五、发送Httpsession中的数据

这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据
注册HttpSessionHandshakeIntercepror

  1. /**
  2. * 注册stomp的端点
  3. */
  4. @Override
  5. public void registerStompEndpoints(StompEndpointRegistry registry) {
  6. // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
  7. // 在网页上我们就可以通过这个链接
  8. // http://localhost:8080/webSocketServer
  9. // 来和服务器的WebSocket连接
  10. registry.addEndpoint("/webSocketServer")
  11. .addInterceptors(new HttpSessionHandshakeInterceptor())
  12. .setAllowedOrigins("*")
  13. .withSockJS();
  14. }

服务端:

  1. /**
  2. * 接收HttpSession数据
  3. * */
  4. @MessageMapping(value = "/httpsession")
  5. public void httpsession( StompHeaderAccessor accessor) {
  6. String name = (String) accessor.getSessionAttributes().get("name");
  7. System.out.println(1111);
  8. }

客户端:

  1. /**
  2. * 发送httpsession
  3. * */
  4. function send4() {
  5. stompClient.send("/app/httpsession", {},
  6. {});
  7. }

所有代码

前端JS

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
  7. <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
  8. <script>
  9. var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer");
  10. var stompClient = Stomp.over(socket);
  11. window.onload = function () {
  12. connect();
  13. }
  14. //订阅消息
  15. function subscribe() {
  16. stompClient.subscribe('/user/queue/message', function (response) {
  17. console.log("/user/queue/message 你接收到的消息为:" + response);
  18. });
  19. }
  20. /**
  21. * 发送用户信息
  22. * */
  23. function send0() {
  24. stompClient.send("/app/principal", {},
  25. {});
  26. }
  27. /**
  28. * 发送JSON数据体
  29. * */
  30. function send() {
  31. stompClient.send("/app/P2P", {},
  32. JSON.stringify({ 'name': 'test' }));
  33. }
  34. /**
  35. * 发送路径参数
  36. * */
  37. function send2() {
  38. stompClient.send("/app/path/zhangsan/XXX公司", {},
  39. {});
  40. }
  41. /**
  42. * 发送header参数
  43. * */
  44. function send3() {
  45. stompClient.send("/app/header", {"one":"lalala", "two":"中国"},
  46. {});
  47. }
  48. /**
  49. * 发送httpsession
  50. * */
  51. function send4() {
  52. stompClient.send("/app/httpsession", {},
  53. {});
  54. }
  55. // /**
  56. // * 发送URL中?&参数
  57. // * */
  58. // function send5() {
  59. // stompClient.send("/app/param?name=张三", {},
  60. // {});
  61. // }
  62. function connect() {
  63. stompClient.connect({
  64. Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4"
  65. //这里可以改成token
  66. // name: 'admin' // 携带客户端信息
  67. },
  68. function connectCallback(frame) {
  69. // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
  70. alert("success");
  71. subscribe();
  72. },
  73. function errorCallBack(error) {
  74. // 连接失败时(服务器响应 ERROR 帧)的回调方法
  75. alert("error");
  76. });
  77. }
  78. function disconnect() {
  79. if (stompClient != null) {
  80. stompClient.disconnect();
  81. }
  82. // setConnected(false);
  83. console.log("Disconnected");
  84. }
  85. </script>
  86. </head>
  87. <body>
  88. <input type="text" id="info"/><button onclick="send5();">发送</button>
  89. </body>
  90. </html>

后端MessaeMapping处

  1. package com.iscas.biz.test.controller;
  2. import com.iscas.templet.common.ResponseEntity;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.messaging.handler.annotation.*;
  6. import org.springframework.messaging.simp.SimpMessagingTemplate;
  7. import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
  8. import org.springframework.messaging.simp.user.SimpUser;
  9. import org.springframework.messaging.simp.user.SimpUserRegistry;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import java.security.Principal;
  12. import java.util.Map;
  13. /**
  14. * 如有要看例子,请打开注释
  15. *
  16. **/
  17. @RestController
  18. @Slf4j
  19. public class WebSoketDemoController {
  20. //spring提供的发送消息模板
  21. @Autowired
  22. private SimpMessagingTemplate messagingTemplate;
  23. @Autowired
  24. private SimpUserRegistry userRegistry;
  25. /**
  26. * 接收用户信息
  27. * */
  28. @MessageMapping(value = "/principal")
  29. public void test(Principal principal) {
  30. log.info("当前在线人数:" + userRegistry.getUserCount());
  31. int i = 1;
  32. for (SimpUser user : userRegistry.getUsers()) {
  33. log.info("用户" + i++ + "---" + user);
  34. }
  35. //发送消息给指定用户
  36. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  37. }
  38. /**
  39. * 接收数据体
  40. * */
  41. @MessageMapping(value = "/P2P")
  42. public void templateTest(Principal principal, Map<String,String> data) {
  43. log.info("当前在线人数:" + userRegistry.getUserCount());
  44. int i = 1;
  45. for (SimpUser user : userRegistry.getUsers()) {
  46. log.info("用户" + i++ + "---" + user);
  47. }
  48. //发送消息给指定用户
  49. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  50. }
  51. /**
  52. * 接收路径参数
  53. * */
  54. @MessageMapping(value = "/path/{name}/{company}")
  55. public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
  56. log.info("当前在线人数:" + userRegistry.getUserCount());
  57. int i = 1;
  58. for (SimpUser user : userRegistry.getUsers()) {
  59. log.info("用户" + i++ + "---" + user);
  60. }
  61. //发送消息给指定用户
  62. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  63. }
  64. /**
  65. * 接收header参数
  66. * */
  67. @MessageMapping(value = "/header")
  68. public void headerTest(Principal principal, @Header String one, @Header String two) {
  69. log.info("当前在线人数:" + userRegistry.getUserCount());
  70. int i = 1;
  71. for (SimpUser user : userRegistry.getUsers()) {
  72. log.info("用户" + i++ + "---" + user);
  73. }
  74. //发送消息给指定用户
  75. messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
  76. }
  77. /**
  78. * 接收HttpSession数据
  79. * */
  80. @MessageMapping(value = "/httpsession")
  81. public void httpsession( StompHeaderAccessor accessor) {
  82. String name = (String) accessor.getSessionAttributes().get("name");
  83. System.out.println(1111);
  84. }
  85. // /**
  86. // * 接收param数据
  87. // * */
  88. // @MessageMapping(value = "/param")
  89. // public void param(String name) {
  90. // System.out.println(1111);
  91. // }
  92. /*广播*/
  93. @MessageMapping("/broadcast")
  94. @SendTo("/topic/getResponse")
  95. public ResponseEntity topic() throws Exception {
  96. return new ResponseEntity(200,"success");
  97. }
  98. }

Websocket配置类

  1. package com.iscas.base.biz.config.stomp;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.messaging.simp.config.ChannelRegistration;
  4. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  5. import org.springframework.web.socket.config.annotation.*;
  6. import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
  7. /**
  8. * webscoket配置
  9. *
  10. * @auth zhuquanwen
  11. *
  12. **/
  13. //@Configuration
  14. @EnableWebSocketMessageBroker
  15. public class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer {
  16. /**
  17. * 注册stomp的端点
  18. */
  19. @Override
  20. public void registerStompEndpoints(StompEndpointRegistry registry) {
  21. // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
  22. // 在网页上我们就可以通过这个链接
  23. // http://localhost:8080/webSocketServer
  24. // 来和服务器的WebSocket连接
  25. registry.addEndpoint("/webSocketServer")
  26. .addInterceptors(new HttpSessionHandshakeInterceptor())
  27. .setAllowedOrigins("*")
  28. .withSockJS();
  29. }
  30. /**
  31. * 配置信息代理
  32. */
  33. @Override
  34. public void configureMessageBroker(MessageBrokerRegistry registry) {
  35. // 订阅Broker名称
  36. registry.enableSimpleBroker("/queue", "/topic");
  37. // 全局使用的消息前缀(客户端订阅路径上会体现出来)
  38. registry.setApplicationDestinationPrefixes("/app");
  39. // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
  40. registry.setUserDestinationPrefix("/user/");
  41. }
  42. /**
  43. * 配置客户端入站通道拦截器
  44. */
  45. @Override
  46. public void configureClientInboundChannel(ChannelRegistration registration) {
  47. registration.interceptors(createUserInterceptor());
  48. }
  49. /*将客户端渠道拦截器加入spring ioc容器*/
  50. @Bean
  51. public UserInterceptor createUserInterceptor() {
  52. return new UserInterceptor();
  53. }
  54. @Override
  55. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  56. registration.setMessageSizeLimit(500 * 1024 * 1024);
  57. registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
  58. registration.setSendTimeLimit(200000);
  59. }
  60. }

用户拦截器

  1. package com.iscas.base.biz.config.stomp;
  2. import com.auth0.jwt.interfaces.Claim;
  3. import com.iscas.base.biz.config.Constants;
  4. import com.iscas.base.biz.util.SpringUtils;
  5. import com.iscas.templet.exception.ValidTokenException;
  6. import com.iscas.base.biz.model.auth.User;
  7. import com.iscas.base.biz.util.JWTUtils;
  8. import org.springframework.messaging.Message;
  9. import org.springframework.messaging.MessageChannel;
  10. import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
  11. import org.springframework.messaging.simp.stomp.StompCommand;
  12. import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
  13. import org.springframework.messaging.support.ChannelInterceptor;
  14. import org.springframework.messaging.support.MessageHeaderAccessor;
  15. import javax.servlet.http.HttpSession;
  16. import java.io.UnsupportedEncodingException;
  17. import java.util.LinkedList;
  18. import java.util.Map;
  19. /**
  20. *用户拦截器
  21. **/
  22. public class UserInterceptor implements ChannelInterceptor {
  23. @Override
  24. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  25. StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  26. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  27. Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
  28. if (raw instanceof Map) {
  29. //这里就是token
  30. Object name = ((Map) raw).get(Constants.TOKEN_KEY);
  31. if (name instanceof LinkedList) {
  32. // 设置当前访问器的认证用户
  33. // String token = ((LinkedList) name).get(0).toString();
  34. // String username = null;
  35. // try {
  36. // Map<String, Claim> claimMap = JWTUtils.verifyToken(token);
  37. // username = claimMap.get("username").asString();
  38. // if(username == null){
  39. // throw new RuntimeException("websocket认证失败");
  40. // }
  41. // } catch (UnsupportedEncodingException e) {
  42. // e.printStackTrace();
  43. // throw new RuntimeException("websocket认证失败", e);
  44. // } catch (ValidTokenException e) {
  45. // e.printStackTrace();
  46. // throw new RuntimeException("websocket认证失败", e);
  47. // }
  48. // User user = new User();
  49. // user.setUsername(username);
  50. // accessor.setUser(user);
  51. User user = new User();
  52. user.setUsername("lalala");
  53. accessor.setUser(user);
  54. }
  55. }
  56. } else if (StompCommand.SEND.equals(accessor.getCommand())) {
  57. //发送数据
  58. }
  59. return message;
  60. }
  61. @Override
  62. public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
  63. }
  64. @Override
  65. public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
  66. }
  67. @Override
  68. public boolean preReceive(MessageChannel channel) {
  69. return false;
  70. }
  71. @Override
  72. public Message<?> postReceive(Message<?> message, MessageChannel channel) {
  73. return null;
  74. }
  75. @Override
  76. public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
  77. }
  78. }