一、发送数据携带用户ID
携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信
需要定义一个实体实现Principal,实现getName()方法
@Getter
@Setter
public class User implements Principal {
private String username;
private String password;
private String role;
private List<Url> urls;
@Override
public String getName() {
return username;
}
}
定义用户拦截器做认证,并生成User,注入StompHeaderAccessor
/**
*用户拦截器
**/
public class UserInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
//这里就是token
Object name = ((Map) raw).get(Constants.TOKEN_KEY);
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户
String token = ((LinkedList) name).get(0).toString();
String username = null;
try {
Map<String, Claim> claimMap = JWTUtils.verifyToken(token);
username = claimMap.get("username").asString();
if(username == null){
throw new RuntimeException("websocket认证失败");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new RuntimeException("websocket认证失败", e);
} catch (ValidTokenException e) {
e.printStackTrace();
throw new RuntimeException("websocket认证失败", e);
}
User user = new User();
user.setUsername(username);
accessor.setUser(user);
// User user = new User();
// user.setUsername("lalala");
// accessor.setUser(user);
}
}
}
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
@Override
public boolean preReceive(MessageChannel channel) {
return false;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return null;
}
@Override
public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
}
}
/*将客户端渠道拦截器加入spring ioc容器*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}
服务端
/**
* 接收用户信息
* */
@MessageMapping(value = "/principal")
public void test(Principal principal) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
客户端:
/*
* 发送用户信息
*/
function send0() {
stompClient.send("/app/principal", {},
{});
}
二、发送JSON数据体
服务端可以直接在函数中注入JavaBean或者Map,List或者String接收
服务端:
/*点对点通信*/
@MessageMapping(value = "/P2P")
public void templateTest(Principal principal, Map<String,String> data) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
客户端:
/**
* 发送JSON数据体
* */
function send() {
stompClient.send("/app/P2P", {},
JSON.stringify({ 'name': 'test' }));
}
三、将参数携带到发送请求的URL路径中
使用@DestinationVariable注解,类似SpringMVC的@PathVirable
服务端:
/**
* 接收路径参数
* */
@MessageMapping(value = "/path/{name}/{company}")
public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
客户端:
/*
* 发送路径参数
*/
function send2() {
stompClient.send("/app/path/zhangsan/XXX公司", {},
{});
}
四、发送header
使用@Header注解
服务端:
/**
* 接收header参数
* */
@MessageMapping(value = "/header")
public void headerTest(Principal principal, @Header String one, @Header String two) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
客户端:
/**
* 发送header参数
* */
function send3() {
stompClient.send("/app/header", {"one":"lalala", "two":"中国"},
{});
}
五、发送Httpsession中的数据
这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据
注册HttpSessionHandshakeIntercepror
/**
* 注册stomp的端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
// 在网页上我们就可以通过这个链接
// http://localhost:8080/webSocketServer
// 来和服务器的WebSocket连接
registry.addEndpoint("/webSocketServer")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
.withSockJS();
}
服务端:
/**
* 接收HttpSession数据
* */
@MessageMapping(value = "/httpsession")
public void httpsession( StompHeaderAccessor accessor) {
String name = (String) accessor.getSessionAttributes().get("name");
System.out.println(1111);
}
客户端:
/**
* 发送httpsession
* */
function send4() {
stompClient.send("/app/httpsession", {},
{});
}
所有代码
前端JS
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script>
var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer");
var stompClient = Stomp.over(socket);
window.onload = function () {
connect();
}
//订阅消息
function subscribe() {
stompClient.subscribe('/user/queue/message', function (response) {
console.log("/user/queue/message 你接收到的消息为:" + response);
});
}
/**
* 发送用户信息
* */
function send0() {
stompClient.send("/app/principal", {},
{});
}
/**
* 发送JSON数据体
* */
function send() {
stompClient.send("/app/P2P", {},
JSON.stringify({ 'name': 'test' }));
}
/**
* 发送路径参数
* */
function send2() {
stompClient.send("/app/path/zhangsan/XXX公司", {},
{});
}
/**
* 发送header参数
* */
function send3() {
stompClient.send("/app/header", {"one":"lalala", "two":"中国"},
{});
}
/**
* 发送httpsession
* */
function send4() {
stompClient.send("/app/httpsession", {},
{});
}
// /**
// * 发送URL中?&参数
// * */
// function send5() {
// stompClient.send("/app/param?name=张三", {},
// {});
// }
function connect() {
stompClient.connect({
Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4"
//这里可以改成token
// name: 'admin' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
alert("success");
subscribe();
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
alert("error");
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
// setConnected(false);
console.log("Disconnected");
}
</script>
</head>
<body>
<input type="text" id="info"/><button onclick="send5();">发送</button>
</body>
</html>
后端MessaeMapping处
package com.iscas.biz.test.controller;
import com.iscas.templet.common.ResponseEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.*;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.web.bind.annotation.RestController;
import java.security.Principal;
import java.util.Map;
/**
* 如有要看例子,请打开注释
*
**/
@RestController
@Slf4j
public class WebSoketDemoController {
//spring提供的发送消息模板
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/**
* 接收用户信息
* */
@MessageMapping(value = "/principal")
public void test(Principal principal) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
/**
* 接收数据体
* */
@MessageMapping(value = "/P2P")
public void templateTest(Principal principal, Map<String,String> data) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
/**
* 接收路径参数
* */
@MessageMapping(value = "/path/{name}/{company}")
public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
/**
* 接收header参数
* */
@MessageMapping(value = "/header")
public void headerTest(Principal principal, @Header String one, @Header String two) {
log.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
log.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");
}
/**
* 接收HttpSession数据
* */
@MessageMapping(value = "/httpsession")
public void httpsession( StompHeaderAccessor accessor) {
String name = (String) accessor.getSessionAttributes().get("name");
System.out.println(1111);
}
// /**
// * 接收param数据
// * */
// @MessageMapping(value = "/param")
// public void param(String name) {
// System.out.println(1111);
// }
/*广播*/
@MessageMapping("/broadcast")
@SendTo("/topic/getResponse")
public ResponseEntity topic() throws Exception {
return new ResponseEntity(200,"success");
}
}
Websocket配置类
package com.iscas.base.biz.config.stomp;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
/**
* webscoket配置
*
* @auth zhuquanwen
*
**/
//@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer {
/**
* 注册stomp的端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
// 在网页上我们就可以通过这个链接
// http://localhost:8080/webSocketServer
// 来和服务器的WebSocket连接
registry.addEndpoint("/webSocketServer")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
.withSockJS();
}
/**
* 配置信息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 订阅Broker名称
registry.enableSimpleBroker("/queue", "/topic");
// 全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user/");
}
/**
* 配置客户端入站通道拦截器
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(createUserInterceptor());
}
/*将客户端渠道拦截器加入spring ioc容器*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(500 * 1024 * 1024);
registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
registration.setSendTimeLimit(200000);
}
}
用户拦截器
package com.iscas.base.biz.config.stomp;
import com.auth0.jwt.interfaces.Claim;
import com.iscas.base.biz.config.Constants;
import com.iscas.base.biz.util.SpringUtils;
import com.iscas.templet.exception.ValidTokenException;
import com.iscas.base.biz.model.auth.User;
import com.iscas.base.biz.util.JWTUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import javax.servlet.http.HttpSession;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Map;
/**
*用户拦截器
**/
public class UserInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
//这里就是token
Object name = ((Map) raw).get(Constants.TOKEN_KEY);
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户
// String token = ((LinkedList) name).get(0).toString();
// String username = null;
// try {
// Map<String, Claim> claimMap = JWTUtils.verifyToken(token);
// username = claimMap.get("username").asString();
// if(username == null){
// throw new RuntimeException("websocket认证失败");
// }
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// throw new RuntimeException("websocket认证失败", e);
// } catch (ValidTokenException e) {
// e.printStackTrace();
// throw new RuntimeException("websocket认证失败", e);
// }
// User user = new User();
// user.setUsername(username);
// accessor.setUser(user);
User user = new User();
user.setUsername("lalala");
accessor.setUser(user);
}
}
} else if (StompCommand.SEND.equals(accessor.getCommand())) {
//发送数据
}
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
@Override
public boolean preReceive(MessageChannel channel) {
return false;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return null;
}
@Override
public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
}
}