原文: https://howtodoinjava.com/spring-webflux/reactive-websockets/

在这个 spring webflux websocket 示例中,学习使用 spring webflux 创建支持客户端和服务器之间的 websocket 连接的响应式应用程序。

websocket 是 Web 浏览器和服务器之间的双向全双工持久连接。 建立连接后,它将保持打开状态,直到客户端或服务器决定关闭此连接。 Websocket 在具有多个用户相互连接并发送和接收消息的应用程序中具有实际用途,例如聊天应用程序。

1. Maven 依赖

我们需要具有spring-boot-starter-webfluxjavax.websocket-api依赖项。

Spring WebFlux 期望使用 WebSockets 版本 1.1 。 使用 1.0 时,代码将无法运行。

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.howtodoinjava.demo</groupId>
  7. <artifactId>spring-webflux-example</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. <packaging>jar</packaging>
  10. <parent>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-parent</artifactId>
  13. <version>2.1.2.RELEASE</version>
  14. <relativePath /> <!-- lookup parent from repository -->
  15. </parent>
  16. <name>spring-webflux-example</name>
  17. <url>http://maven.apache.org</url>
  18. <properties>
  19. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-webflux</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>javax.websocket</groupId>
  29. <artifactId>javax.websocket-api</artifactId>
  30. <version>1.1</version>
  31. </dependency>
  32. </dependencies>
  33. </project>

2. WebSocketHandler – 消息处理器

在应用程序的中心,我们将有一个WebSocketHandler,它将处理 WebSocket 消息和生命周期事件。 给定的EchoHandler将收到一条消息,并以RECEIVED ON SERVER ::为前缀返回。

EchoHandler.java

  1. package com.howtodoinjava.demo.handler;
  2. import org.springframework.web.reactive.socket.WebSocketHandler;
  3. import org.springframework.web.reactive.socket.WebSocketSession;
  4. import reactor.core.publisher.Mono;
  5. public class EchoHandler implements WebSocketHandler
  6. {
  7. @Override
  8. public Mono<Void> handle(WebSocketSession session)
  9. {
  10. return session
  11. .send( session.receive()
  12. .map(msg -> "RECEIVED ON SERVER :: " + msg.getPayloadAsText())
  13. .map(session::textMessage)
  14. );
  15. }
  16. }

3. 配置 WebSocketHandler

首先,需要使用SimpleUrlHandlerMappingWebSocketHandler映射到 URL。 然后我们需要一个WebSocketHandlerAdapter来调用WebSocketHandler

最后,为了让WebSocketHandlerAdapter了解传入的响应式运行时请求,我们需要使用ReactorNettyRequestUpgradeStrategy配置WebSocketService(因为我们正在使用默认的 Netty 服务器)。

EchoApplication.java

  1. package com.howtodoinjava.demo;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.core.Ordered;
  8. import org.springframework.web.reactive.HandlerMapping;
  9. import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
  10. import org.springframework.web.reactive.socket.WebSocketHandler;
  11. import org.springframework.web.reactive.socket.server.WebSocketService;
  12. import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
  13. import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
  14. import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
  15. import com.howtodoinjava.demo.handler.EchoHandler;
  16. @SpringBootApplication
  17. public class EchoApplication {
  18. public static void main(String[] args) {
  19. SpringApplication.run(EchoApplication.class, args);
  20. }
  21. @Bean
  22. public EchoHandler echoHandler() {
  23. return new EchoHandler();
  24. }
  25. @Bean
  26. public HandlerMapping handlerMapping() {
  27. Map<String, WebSocketHandler> map = new HashMap<>();
  28. map.put("/echo", echoHandler());
  29. SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
  30. mapping.setUrlMap(map);
  31. mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
  32. return mapping;
  33. }
  34. @Bean
  35. public WebSocketHandlerAdapter handlerAdapter() {
  36. return new WebSocketHandlerAdapter(webSocketService());
  37. }
  38. @Bean
  39. public WebSocketService webSocketService() {
  40. return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy());
  41. }
  42. }

4. Websocket 客户端

首先创建一个响应式网络客户端。 为了在浏览器中进行测试,我们有以下两个文件app.jsindex.html。 JS 文件具有用于连接/断开连接,发送消息并显示从服务器接收的消息的代码。

app.js

  1. var ws = null;
  2. var url = "ws://localhost:8080/echo";
  3. function setConnected(connected)
  4. {
  5. document.getElementById('connect').disabled = connected;
  6. document.getElementById('disconnect').disabled = !connected;
  7. document.getElementById('echo').disabled = !connected;
  8. }
  9. function connect()
  10. {
  11. ws = new WebSocket(url);
  12. ws.onopen = function() {
  13. setConnected(true);
  14. log('Info: Connection Established.');
  15. };
  16. ws.onmessage = function(event) {
  17. log(event.data);
  18. };
  19. ws.onclose = function(event) {
  20. setConnected(false);
  21. log('Info: Closing Connection.');
  22. };
  23. }
  24. function disconnect()
  25. {
  26. if (ws != null) {
  27. ws.close();
  28. ws = null;
  29. }
  30. setConnected(false);
  31. }
  32. function echo()
  33. {
  34. if (ws != null)
  35. {
  36. var message = document.getElementById('message').value;
  37. log('Sent to server :: ' + message);
  38. ws.send(message);
  39. } else {
  40. alert('connection not established, please connect.');
  41. }
  42. }
  43. function log(message)
  44. {
  45. var console = document.getElementById('logging');
  46. var p = document.createElement('p');
  47. p.appendChild(document.createTextNode(message));
  48. console.appendChild(p);
  49. }

index.html

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <link type="text/css" rel="stylesheet"
  5. href="https://cdnjs.cloudflare.com/ajax/libs/semantic-ui/2.2.10/semantic.min.css" />
  6. <script type="text/javascript" src="app.js"></script>
  7. </head>
  8. <body>
  9. <div>
  10. <div id="connect-container" class="ui centered grid">
  11. <div class="row">
  12. <button id="connect" onclick="connect();" class="ui green button ">Connect</button>
  13. <button id="disconnect" disabled="disabled" onclick="disconnect();"
  14. class="ui red button">Disconnect</button>
  15. </div>
  16. <div class="row">
  17. <textarea id="message" style="width: 350px" class="ui input"
  18. placeholder="Message to Echo"></textarea>
  19. </div>
  20. <div class="row">
  21. <button id="echo" onclick="echo();" disabled="disabled"
  22. class="ui button">Echo message</button>
  23. </div>
  24. </div>
  25. <div id="console-container">
  26. <h3>Logging</h3>
  27. <div id="logging"></div>
  28. </div>
  29. </div>
  30. </body>
  31. </html>

5. 测试 Spring webflux websocket 示例

在浏览器中输入 URL:http://localhost:8080/index.html

测试 websocket 的连接,断开功能,然后尝试发送一些消息。

Spring Boot WebFlux WebSocket 示例 - 图1

Spring webflux + websocket 示例

请问您有关使用 spring webflux 与服务器建立响应式 Websocket 连接的问题。

学习愉快!

下载源码