STOMP是一个更高级的协议,它使用一个基于帧(frame)的格式来定义消息,与HTTP的request和response类似。本人主要讲述stomp 在websocket中的配置,以及与session的集合做点对点通信,以及微信小程序等需要ws/wss协议时候前后端的配置。
基本配置以及与SESSION的结合
1.依赖
springboot 中引入websocket依赖,这里用gradle构建,用maven对应过去就是了
compile group: ‘org.springframework.boot’, name: ‘spring-boot-starter-websocket’
2.定义WebSocketStompConfig 配置类
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
/**
* 注册stomp的端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
// 在网页上我们就可以通过这个链接
// http://localhost:8080/webSocketServer
// 来和服务器的WebSocket连接
registry.addEndpoint("/webSocketServer")
.setAllowedOrigins("*")
.addInterceptors(new SessionAuthHandshakeInterceptor())
.withSockJS();
}
/**
* 配置信息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 订阅Broker名称
registry.enableSimpleBroker("/queue", "/topic");
// 全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user/");
}
/**
* 配置客户端入站通道拦截器
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(createUserInterceptor());
}
/*将客户端渠道拦截器加入spring ioc容器*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(500 * 1024 * 1024);
registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
registration.setSendTimeLimit(200000);
}
}
3.定义用户拦截器
这里preSend里当连接方式为CONNECT的时候获取session里的用户信息,注入stompHeaderAccessor。注意一点的是用户类需要实现java.security.Principal。preSend有很多连接方式,包括DISCONNECT,SUBSCRIBE, DISSUBSCRIBE,可以用这么连接方式监控用户的上线下线,统计每个订阅的在线人数等等,大家可以自行想象。
public class UserInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
User user = (User)accessor.getSessionAttributes().get(Constants.WEBSOCKET_USER_KEY);
accessor.setUser(user);
}
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
@Override
public boolean preReceive(MessageChannel channel) {
return false;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return null;
}
@Override
public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
}
}
4.定义handshake握手拦截器
这里有个逻辑就是当webscoket建立连接的时候被拦截,获取当前应用的session,将用户登录信息获取出来,如果用户未登录,那么不好意思拒绝连接,如果已经登陆了,那么将用户绑定到stomp的session中,第3步的时候就调用了这个用户信息。
public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor {
private Logger logger = LoggerFactory.getLogger(SessionAuthHandshakeInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
HttpSession session = getSession(request);
if(session == null || session.getAttribute(Constants.SESSION_USER) == null){
logger.error("websocket权限拒绝");
// return false;
throw new CmiException("websocket权限拒绝");
}
attributes.put(Constants.WEBSOCKET_USER_KEY,session.getAttribute(Constants.SESSION_USER));
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
private HttpSession getSession(ServerHttpRequest request) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
return serverRequest.getServletRequest().getSession(false);
}
return null;
}
}
5.广播通信例子
服务端 添加MessageMapping注解,@SendoTo对应前台的订阅地址。这里也可以用更灵活的方式,使用spring的SimpMessagingTemplate模板,messagingTemplate.convertAndSend方法广播式通信。
/*广播*/
@MessageMapping("/broadcast")
@SendTo("/topic/getResponse")
public ResponseEntity topic() throws Exception {
return new ResponseEntity(200,"success");
}
客户端 var socket = new SockJS(“http://172.16.10.156:80/cmi/webSocketServer“),var stompClient = Stomp.over(socket); 创建客户端连接对象。connect(),建立连接,connect的成功回调函数里执行subscribe()订阅,订阅的地址/topic/getResponse对应服务端的@SendTo地址。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script>
var socket = new SockJS("http://ip:port/context-path/webSocketServer");
var stompClient = Stomp.over(socket);
window.onload = function () {
connect();
}
//订阅消息
function subscribe() {
stompClient.subscribe('/topic/getResponse', function(response){
alert("/topic/getResponse 你接收到的消息为:" + response);
});
}
function send() {
stompClient.send("/app/broadcast", {},
JSON.stringify({ 'name': 'test' }));
}
function connect() {
stompClient.connect({
//这里可以改成token
name: 'test' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
alert("success");
subscribe();
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
alert("error");
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
// setConnected(false);
console.log("Disconnected");
}
</script>
</head>
<body>
<input type="text" id="info"/><button onclick="send();">发送</button>
</body>
</html>
6.点对点通信
服务端 注入SimpUserRegistry 对象(不是必须的),这个对象里存储了第3步里的user对象。
messagingTemplate.convertAndSendToUser(“test”, “/queue/message1”,”服务器主动推的数据”);这句是重点,第一个参数 是注册的用户名,第二个参数/queue/message对应着前台的订阅/user/queue/message1,第三个参数就是推送的实质内容。/p1的mapping对应的前台send的地址/app/p1
//spring提供的发送消息模板
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/*点对点通信*/
@MessageMapping(value = "/p1")
public void templateTest1(Principal principal) {
logger.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
logger.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser("test", "/queue/message1","服务器主动推的数据");
}
客户端 这里注意与后台定制对应的时候 发送时候对了/app前缀,订阅地址多了/user前缀。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script>
var socket = new SockJS("http://ip:port/context-path/webSocketServer");
var stompClient = Stomp.over(
);
window.onload = function () {
connect();
}
//订阅消息
function subscribe() {
stompClient.subscribe('/user/queue/message1', function (response) {
console.log("/user/queue/message1 你接收到的消息为:" + response);
});
}
function send() {
stompClient.send("/app/p1", {},
JSON.stringify({ 'name': 'test' }));
}
function connect() {
stompClient.connect({
// name: 'admin' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
alert("success");
subscribe();
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
alert("error");
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
// setConnected(false);
console.log("Disconnected");
}
</script>
</head>
<body>
<input type="text" id="info"/><button onclick="send();">发送</button>
</body>
</html>
ws/wss协议的使用
上面介绍的方式前台建立连接还是使用的http或者https的方式,如果要使用ws/wss协议,前后台稍微作一下改造
后台 将withSokJS()注释掉
registry.addEndpoint("/webSocketServer")
.setAllowedOrigins("*")
.addInterceptors(new SessionAuthHandshakeInterceptor());
//.withSockJS();
前台 构建客户端对象处作一点改动
// var socket = new WebSocket("http://ip:port/context-path/webSocketServer");
// var stompClient = Stomp.over(socket);
var url = "ws://192.168.100.90:80/cmi/webSocketServer";
var stompClient = Stomp.client(url);