STOMP是一个更高级的协议,它使用一个基于帧(frame)的格式来定义消息,与HTTP的request和response类似。本人主要讲述stomp 在websocket中的配置,以及与session的集合做点对点通信,以及微信小程序等需要ws/wss协议时候前后端的配置。

基本配置以及与SESSION的结合

1.依赖

springboot 中引入websocket依赖,这里用gradle构建,用maven对应过去就是了
compile group: ‘org.springframework.boot’, name: ‘spring-boot-starter-websocket’

2.定义WebSocketStompConfig 配置类

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
  4. /**
  5. * 注册stomp的端点
  6. */
  7. @Override
  8. public void registerStompEndpoints(StompEndpointRegistry registry) {
  9. // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
  10. // 在网页上我们就可以通过这个链接
  11. // http://localhost:8080/webSocketServer
  12. // 来和服务器的WebSocket连接
  13. registry.addEndpoint("/webSocketServer")
  14. .setAllowedOrigins("*")
  15. .addInterceptors(new SessionAuthHandshakeInterceptor())
  16. .withSockJS();
  17. }
  18. /**
  19. * 配置信息代理
  20. */
  21. @Override
  22. public void configureMessageBroker(MessageBrokerRegistry registry) {
  23. // 订阅Broker名称
  24. registry.enableSimpleBroker("/queue", "/topic");
  25. // 全局使用的消息前缀(客户端订阅路径上会体现出来)
  26. registry.setApplicationDestinationPrefixes("/app");
  27. // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
  28. registry.setUserDestinationPrefix("/user/");
  29. }
  30. /**
  31. * 配置客户端入站通道拦截器
  32. */
  33. @Override
  34. public void configureClientInboundChannel(ChannelRegistration registration) {
  35. registration.setInterceptors(createUserInterceptor());
  36. }
  37. /*将客户端渠道拦截器加入spring ioc容器*/
  38. @Bean
  39. public UserInterceptor createUserInterceptor() {
  40. return new UserInterceptor();
  41. }
  42. @Override
  43. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  44. registration.setMessageSizeLimit(500 * 1024 * 1024);
  45. registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
  46. registration.setSendTimeLimit(200000);
  47. }
  48. }

3.定义用户拦截器

这里preSend里当连接方式为CONNECT的时候获取session里的用户信息,注入stompHeaderAccessor。注意一点的是用户类需要实现java.security.Principal。preSend有很多连接方式,包括DISCONNECT,SUBSCRIBE, DISSUBSCRIBE,可以用这么连接方式监控用户的上线下线,统计每个订阅的在线人数等等,大家可以自行想象。

  1. public class UserInterceptor implements ChannelInterceptor {
  2. @Override
  3. public Message<?> preSend(Message<?> message, MessageChannel channel) {
  4. StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  5. if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  6. User user = (User)accessor.getSessionAttributes().get(Constants.WEBSOCKET_USER_KEY);
  7. accessor.setUser(user);
  8. }
  9. return message;
  10. }
  11. @Override
  12. public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
  13. }
  14. @Override
  15. public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
  16. }
  17. @Override
  18. public boolean preReceive(MessageChannel channel) {
  19. return false;
  20. }
  21. @Override
  22. public Message<?> postReceive(Message<?> message, MessageChannel channel) {
  23. return null;
  24. }
  25. @Override
  26. public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
  27. }
  28. }

4.定义handshake握手拦截器

这里有个逻辑就是当webscoket建立连接的时候被拦截,获取当前应用的session,将用户登录信息获取出来,如果用户未登录,那么不好意思拒绝连接,如果已经登陆了,那么将用户绑定到stomp的session中,第3步的时候就调用了这个用户信息。

  1. public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor {
  2. private Logger logger = LoggerFactory.getLogger(SessionAuthHandshakeInterceptor.class);
  3. @Override
  4. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
  5. HttpSession session = getSession(request);
  6. if(session == null || session.getAttribute(Constants.SESSION_USER) == null){
  7. logger.error("websocket权限拒绝");
  8. // return false;
  9. throw new CmiException("websocket权限拒绝");
  10. }
  11. attributes.put(Constants.WEBSOCKET_USER_KEY,session.getAttribute(Constants.SESSION_USER));
  12. return true;
  13. }
  14. @Override
  15. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
  16. }
  17. private HttpSession getSession(ServerHttpRequest request) {
  18. if (request instanceof ServletServerHttpRequest) {
  19. ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
  20. return serverRequest.getServletRequest().getSession(false);
  21. }
  22. return null;
  23. }
  24. }

5.广播通信例子

服务端 添加MessageMapping注解,@SendoTo对应前台的订阅地址。这里也可以用更灵活的方式,使用spring的SimpMessagingTemplate模板,messagingTemplate.convertAndSend方法广播式通信。

  1. /*广播*/
  2. @MessageMapping("/broadcast")
  3. @SendTo("/topic/getResponse")
  4. public ResponseEntity topic() throws Exception {
  5. return new ResponseEntity(200,"success");
  6. }

客户端 var socket = new SockJS(“http://172.16.10.156:80/cmi/webSocketServer“),var stompClient = Stomp.over(socket); 创建客户端连接对象。connect(),建立连接,connect的成功回调函数里执行subscribe()订阅,订阅的地址/topic/getResponse对应服务端的@SendTo地址。

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
  7. <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
  8. <script>
  9. var socket = new SockJS("http://ip:port/context-path/webSocketServer");
  10. var stompClient = Stomp.over(socket);
  11. window.onload = function () {
  12. connect();
  13. }
  14. //订阅消息
  15. function subscribe() {
  16. stompClient.subscribe('/topic/getResponse', function(response){
  17. alert("/topic/getResponse 你接收到的消息为:" + response);
  18. });
  19. }
  20. function send() {
  21. stompClient.send("/app/broadcast", {},
  22. JSON.stringify({ 'name': 'test' }));
  23. }
  24. function connect() {
  25. stompClient.connect({
  26. //这里可以改成token
  27. name: 'test' // 携带客户端信息
  28. },
  29. function connectCallback(frame) {
  30. // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
  31. alert("success");
  32. subscribe();
  33. },
  34. function errorCallBack(error) {
  35. // 连接失败时(服务器响应 ERROR 帧)的回调方法
  36. alert("error");
  37. });
  38. }
  39. function disconnect() {
  40. if (stompClient != null) {
  41. stompClient.disconnect();
  42. }
  43. // setConnected(false);
  44. console.log("Disconnected");
  45. }
  46. </script>
  47. </head>
  48. <body>
  49. <input type="text" id="info"/><button onclick="send();">发送</button>
  50. </body>
  51. </html>

6.点对点通信

服务端 注入SimpUserRegistry 对象(不是必须的),这个对象里存储了第3步里的user对象。
messagingTemplate.convertAndSendToUser(“test”, “/queue/message1”,”服务器主动推的数据”);这句是重点,第一个参数 是注册的用户名,第二个参数/queue/message对应着前台的订阅/user/queue/message1,第三个参数就是推送的实质内容。/p1的mapping对应的前台send的地址/app/p1

  1. //spring提供的发送消息模板
  2. @Autowired
  3. private SimpMessagingTemplate messagingTemplate;
  4. @Autowired
  5. private SimpUserRegistry userRegistry;
  6. /*点对点通信*/
  7. @MessageMapping(value = "/p1")
  8. public void templateTest1(Principal principal) {
  9. logger.info("当前在线人数:" + userRegistry.getUserCount());
  10. int i = 1;
  11. for (SimpUser user : userRegistry.getUsers()) {
  12. logger.info("用户" + i++ + "---" + user);
  13. }
  14. //发送消息给指定用户
  15. messagingTemplate.convertAndSendToUser("test", "/queue/message1","服务器主动推的数据");
  16. }

客户端 这里注意与后台定制对应的时候 发送时候对了/app前缀,订阅地址多了/user前缀。

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Title</title>
  6. <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
  7. <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
  8. <script>
  9. var socket = new SockJS("http://ip:port/context-path/webSocketServer");
  10. var stompClient = Stomp.over(
  11. );
  12. window.onload = function () {
  13. connect();
  14. }
  15. //订阅消息
  16. function subscribe() {
  17. stompClient.subscribe('/user/queue/message1', function (response) {
  18. console.log("/user/queue/message1 你接收到的消息为:" + response);
  19. });
  20. }
  21. function send() {
  22. stompClient.send("/app/p1", {},
  23. JSON.stringify({ 'name': 'test' }));
  24. }
  25. function connect() {
  26. stompClient.connect({
  27. // name: 'admin' // 携带客户端信息
  28. },
  29. function connectCallback(frame) {
  30. // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
  31. alert("success");
  32. subscribe();
  33. },
  34. function errorCallBack(error) {
  35. // 连接失败时(服务器响应 ERROR 帧)的回调方法
  36. alert("error");
  37. });
  38. }
  39. function disconnect() {
  40. if (stompClient != null) {
  41. stompClient.disconnect();
  42. }
  43. // setConnected(false);
  44. console.log("Disconnected");
  45. }
  46. </script>
  47. </head>
  48. <body>
  49. <input type="text" id="info"/><button onclick="send();">发送</button>
  50. </body>
  51. </html>

ws/wss协议的使用
上面介绍的方式前台建立连接还是使用的http或者https的方式,如果要使用ws/wss协议,前后台稍微作一下改造
后台 将withSokJS()注释掉

  1. registry.addEndpoint("/webSocketServer")
  2. .setAllowedOrigins("*")
  3. .addInterceptors(new SessionAuthHandshakeInterceptor());
  4. //.withSockJS();

前台 构建客户端对象处作一点改动

  1. // var socket = new WebSocket("http://ip:port/context-path/webSocketServer");
  2. // var stompClient = Stomp.over(socket);
  3. var url = "ws://192.168.100.90:80/cmi/webSocketServer";
  4. var stompClient = Stomp.client(url);