Spring 提供了一个 STOMP over WebSocket 客户端和一个STOMP over TCP客户端。
首先,你可以创建并配置 WebSocketStompClient,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的例子中,你可以用 SockJsClient 取代 StandardWebSocketClient,因为它也是 WebSocketClient 的一个实现。SockJsClient 可以使用 WebSocket 或基于 HTTP 的传输作为退路。更多详情,请参见 SockJsClient。
接下来,你可以建立一个连接并为 STOMP 会话提供一个处理程序,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话可以使用时,处理程序会被通知,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,任何有效载荷都可以被发送,并通过配置的 MessageConverter 进行序列化,如下例所示:
session.send("/topic/something", "payload");
你也可以对目的地进行订阅。订阅方法需要一个处理程序来处理订阅上的消息,并返回一个订阅句柄,你可以用它来取消订阅。对于每个收到的消息,处理程序可以指定有效载荷应该被反序列化的目标对象类型,如下面的例子所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
为了启用 STOMP 心跳,你可以用一个 TaskScheduler 来配置 WebSocketStompClient,并可选择自定义心跳间隔(10 秒为写入不活动,会导致发送心跳,10 秒为读取不活动,会关闭连接)。
WebSocketStompClient 只在不活动的情况下发送心跳,即在没有其他消息被发送的情况下发送。当使用 外部代理 时,这可能是一个挑战,因为具有非代理目的地的消息代表活动,但实际上并没有转发到代理。在这种情况下,你可以在初始化外部代理时配置一个 TaskScheduler,确保在只有非代理目的地的消息被发送时,心跳也被转发到代理。
:::info 当你使用 WebSocketStompClient 进行性能测试以模拟来自同一台机器的数千个客户端时,请考虑关闭心跳,因为每个连接都会安排自己的心跳任务,而这对于在同一台机器上运行的大量客户端来说并不优化。 :::
STOMP 协议还支持收据(receipts),客户端必须添加一个收据头(receipt ),服务器在处理完发送或订阅后会用一个 RECEIPT 帧来回应。为了支持这一点,StompSession 提供了 setAutoReceipt(boolean)
功能,使每一个后续的发送或订阅事件都会添加一个 receipt
头。另外,你也可以手动添加一个接收头到 StompHeaders 中。发送和订阅都返回一个 Receiptable 的实例,你可以用它来注册接收成功和失败的回调。对于这个功能,你必须用一个 TaskScheduler 和收据过期前的时间量(默认为 15 秒)来配置客户端。
请注意,StompSessionHandler 本身就是一个 StompFrameHandler,这让它除了处理信息处理中的异常的 handleException 回调和处理包括 ConnectionLostException 在内的传输级错误的 handleTransportError 之外,还能处理 ERROR 帧。
一个例子
这里使用 Java 客户端来实现 这个前端 html 中的功能
package cn.mrcode.study.springdocsread;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author mrcode
*/
public class WebSocketClientTest {
public static void main(String[] args) throws InterruptedException {
// WebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient webSocketClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
String url = "ws://127.0.0.1:8080/portfolio";
// 添加头
final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
// 链接 heaer
final StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setLogin("user1");
connectHeaders.setPasscode("123456");
// 链接到服务端
stompClient.connect(url, headers, connectHeaders,new StompSessionHandlerAdapter() {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// 订阅用户消息
session.subscribe("/user/exchange/amq.direct/trade", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("收到订阅的 /user/exchange/amq.direct/trade 消息:" + payload);
}
});
// 发送更新消息
session.send("/app/trade", "更新消息");
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
exception.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(1000);
}
}
执行测试后,就能看到控制台输出的
收到订阅的 /user/exchange/amq.direct/trade 消息:消息已经处理完成:更新消息