原文: https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/

Spring 5.0 添加了反应堆 Web 框架 Spring WebFlux 。 它是完全无阻塞的,支持响应式背压,并在 Netty,Undertow 和 Servlet 3.1+ 容器等服务器上运行。 在这个 spring webflux 教程中,我们将学习响应式编程,webflux api 和功能齐全的 hello world 示例背后的基本概念。

1. 响应式编程

响应式编程是一种编程示例,可促进异步,非阻塞,事件驱动的数据处理方法。 响应式编程涉及将数据和事件建模为可观察的数据流,并实现数据处理例程以对这些流中的更改做出反应。

在深入研究响应式世界之前,首先要了解阻塞与非阻塞请求处理之间的区别。

1.1. 阻塞与非阻塞(异步)请求处理

1.1.1. 阻塞请求处理

在传统的 MVC 应用程序中,当请求到达服务器时,将创建一个 servlet 线程。 它将请求委托给工作线程进行 I/O 操作(例如数据库访问等)。在工作线程忙时,servlet 线程(请求线程)保持等待状态,因此被阻塞。 也称为同步请求处理

Spring WebFlux 教程 - 图1

阻塞请求处理

由于服务器可以有一定数量的请求线程,因此限制了服务器在最大服务器负载下处理该数量请求的能力。 它可能会影响性能并限制服务器功能的完全利用。

1.1.2. 非阻塞请求处理

在非阻塞或异步请求处理中,没有线程处于等待状态。 通常只有一个请求线程接收该请求。

所有传入的请求都带有事件处理器和回调信息。 请求线程将传入的请求委托给线程池(通常为少量线程),该线程池将请求委托给其处理函数,并立即开始处理来自请求线程的其他传入请求。

处理器功能完成后,池中的线程之一将收集响应并将其传递给回调函数。

Spring WebFlux 教程 - 图2

非阻塞请求处理

线程的非阻塞性质有助于扩展应用程序的性能。 线程数量少意味着内存利用率较低,上下文切换也较少。

1.2. 什么是响应式编程?

术语“响应式”是指围绕响应变化而构建的编程模型。 它是基于发布者-订阅者模式(观察者模式)构建的。 在响应式编程中,我们请求资源并开始执行其他事情。 当数据可用时,我们会收到通知以及回调函数的数据通知。 在回调函数中,我们根据应用程序/用户需求处理响应。

要记住的重要一件事是背压。 在非阻塞代码中,控制事件的速率变得很重要,这样快速生成器就不会淹没其目的地。

响应式 Web 编程非常适合具有流数据的应用程序以及使用该数据并将其流传输给用户的客户端。 这对开发传统的 CRUD 应用程序不是很好。 如果您要开发具有大量数据的下一个 Facebook 或 Twitter ,那么响应式 API 可能正是您想要的。

2. 响应式流 API

新的响应式流式 API 由 Netflix,Pivotal,Lightbend,RedHat,Twitter 和 Oracle 等工程师创建,现已成为 Java 9 的一部分。它定义了四个接口:

  • Publisher: 根据从订阅者收到的需求向订阅者发出一系列事件。 一个发布者可以为多个订阅者提供服务。
    它只有一个方法:
    Publisher.java
    1. public interface Publisher<T>
    2. {
    3. public void subscribe(Subscriber<? super T> s);
    4. }
  • Subscriber: 接收和处理发布者发出的事件。 请注意,在调用Subscription#request(long)发出信号表示需求之前,不会收到任何通知。
    它有四种方法来处理收到的各种响应。
    Subscriber.java
    1. public interface Subscriber<T>
    2. {
    3. public void onSubscribe(Subscription s);
    4. public void onNext(T t);
    5. public void onError(Throwable t);
    6. public void onComplete();
    7. }
  • Subscription: 定义“发布者”和“订阅者”之间的一对一关系。 只能由单个“订阅者”使用一次。 它既用于表示对数据的需求,又用于取消需求(并允许清除资源)。
    Subscription.java
    1. public interface Subscription<T>
    2. {
    3. public void request(long n);
    4. public void cancel();
    5. }
  • Processor: 表示由“订阅者”和“发布者”组成的处理阶段,并服从两者的契约。
    Processor.java
    1. public interface Processor<T, R> extends Subscriber<T>, Publisher<R>
    2. {
    3. }

响应式的两种流行实现是 RxJavahttps://github.com/ReactiveX/RxJava)和 Project Reactorhttps://projectreactor.io/)。

3. 什么是 Spring WebFlux?

Spring WebFlux 是 Spring MVC 的并行版本,并支持完全无阻塞的响应式。 它支持背压概念,并使用 Netty 作为内置服务器来运行响应式应用程序。 如果您熟悉 Spring MVC 编程风格,则也可以轻松地使用 webflux。

Spring Webflux 使用项目反应器作为反应库。 Reactor 是 Reactive Streams 库,因此,它的所有运算符都支持无阻塞背压。 它是与 Spring 紧密合作开发的。

Spring WebFlux 大量使用两个发布者:

  • Mono:返回 0 或 1 个元素。
    1. Mono<String> mono = Mono.just("Alex");
    2. Mono<String> mono = Mono.empty();
  • Flux:返回0…N个元素。 Flux可以是无穷无尽的,这意味着它可以永远保持发光。 它还可以返回一系列元素,然后在返回所有元素后发送完成通知。 ```java Flux flux = Flux.just(“A”, “B”, “C”); Flux flux = Flux.fromArray(new String[]{“A”, “B”, “C”}); Flux flux = Flux.fromIterable(Arrays.asList(“A”, “B”, “C”));

//To subscribe call method

flux.subscribe();

  1. Spring WebFlux 中,我们称为响应式 API /函数,它们返回 mono flux,而您的控制器将返回 mono flux 当您调用返回单声道或流量的 API 时,它将立即返回。 当它们可用时,函数调用的结果将通过单声道或磁通传递给您。
  2. > 要构建真正的非阻塞应用程序,我们必须致力于将其所有组件创建/使用为非阻塞程序,即客户端,控制器,中间服务甚至数据库。 如果其中之一阻止了请求,我们的目标将会失败。
  3. <a name="dcb14c5b"></a>
  4. ## 4. Spring Boot WebFlux 示例
  5. 在此 [**Spring Boot 2**](https://howtodoinjava.com/spring-boot-tutorials/) 应用程序中,我正在创建员工管理系统。 我选择它是因为在学习时,您可以将其与传统的 MVC 风格的应用程序进行比较。 为了使其完全无阻塞,我使用 **mongodb** 作为后端数据库。
  6. <a name="1c1399cb"></a>
  7. #### 4.1. Maven 依赖
  8. 包括`spring-boot-starter-webflux``spring-boot-starter-data-mongodb-reactive``spring-boot-starter-test``reactor-test`依赖项。
  9. `pom.xml`
  10. ```java
  11. <project xmlns="http://maven.apache.org/POM/4.0.0"
  12. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  13. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  14. <modelVersion>4.0.0</modelVersion>
  15. <parent>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-parent</artifactId>
  18. <version>2.1.1.RELEASE</version>
  19. <relativePath /> <!-- lookup parent from repository -->
  20. </parent>
  21. <groupId>com.howtodoinjava</groupId>
  22. <artifactId>spring-webflux-demo</artifactId>
  23. <version>0.0.1-SNAPSHOT</version>
  24. <packaging>jar</packaging>
  25. <name>spring-webflux-demo</name>
  26. <url>http://maven.apache.org</url>
  27. <properties>
  28. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  29. <java.version>1.8</java.version>
  30. </properties>
  31. <dependencies>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-webflux</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-starter-test</artifactId>
  43. <scope>test</scope>
  44. </dependency>
  45. <dependency>
  46. <groupId>io.projectreactor</groupId>
  47. <artifactId>reactor-test</artifactId>
  48. <scope>test</scope>
  49. </dependency>
  50. <dependency>
  51. <groupId>javax.xml.bind</groupId>
  52. <artifactId>jaxb-api</artifactId>
  53. <version>2.3.0</version>
  54. </dependency>
  55. <dependency>
  56. <groupId>javax.servlet</groupId>
  57. <artifactId>javax.servlet-api</artifactId>
  58. <version>3.1.0</version>
  59. <scope>provided</scope>
  60. </dependency>
  61. </dependencies>
  62. </project>

4.2. 配置

Webflux 配置

WebFluxConfig.java

  1. import org.springframework.context.annotation.Configuration;
  2. @Configuration
  3. @EnableWebFlux
  4. public class WebFluxConfig implements WebFluxConfigurer
  5. {
  6. }

MongoDb 配置

MongoConfig.java

  1. import org.springframework.beans.factory.annotation.Value;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
  5. import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
  6. import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
  7. import com.mongodb.reactivestreams.client.MongoClient;
  8. import com.mongodb.reactivestreams.client.MongoClients;
  9. @Configuration
  10. @EnableReactiveMongoRepositories(basePackages = "com.howtodoinjava.demo.dao")
  11. public class MongoConfig extends AbstractReactiveMongoConfiguration
  12. {
  13. @Value("${port}")
  14. private String port;
  15. @Value("${dbname}")
  16. private String dbName;
  17. @Override
  18. public MongoClient reactiveMongoClient() {
  19. return MongoClients.create();
  20. }
  21. @Override
  22. protected String getDatabaseName() {
  23. return dbName;
  24. }
  25. @Bean
  26. public ReactiveMongoTemplate reactiveMongoTemplate() {
  27. return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
  28. }
  29. }

应用程序配置

AppConfig.java

  1. import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.core.io.ClassPathResource;
  5. @Configuration
  6. public class AppConfig
  7. {
  8. @Bean
  9. public static PropertyPlaceholderConfigurer getPropertyPlaceholderConfigurer()
  10. {
  11. PropertyPlaceholderConfigurer ppc = new PropertyPlaceholderConfigurer();
  12. ppc.setLocation(new ClassPathResource("application.properties"));
  13. ppc.setIgnoreUnresolvablePlaceholders(true);
  14. return ppc;
  15. }
  16. }

属性文件

application.properties

  1. port=27017
  2. dbname=testdb

日志配置

logback.xml

  1. <configuration>
  2. <appender name="STDOUT"
  3. class="ch.qos.logback.core.ConsoleAppender">
  4. <encoder>
  5. <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n
  6. </pattern>
  7. </encoder>
  8. </appender>
  9. <logger name="org.springframework" level="DEBUG"
  10. additivity="false">
  11. <appender-ref ref="STDOUT" />
  12. </logger>
  13. <root level="ERROR">
  14. <appender-ref ref="STDOUT" />
  15. </root>
  16. </configuration>

Spring Boot 应用程序

WebfluxFunctionalApp.java

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. @SpringBootApplication
  4. public class WebfluxFunctionalApp {
  5. public static void main(String[] args) {
  6. SpringApplication.run(WebfluxFunctionalApp.class, args);
  7. }
  8. }

4.3. REST 控制器

EmployeeController.java

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.http.HttpStatus;
  3. import org.springframework.http.MediaType;
  4. import org.springframework.http.ResponseEntity;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestBody;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RequestMethod;
  9. import org.springframework.web.bind.annotation.ResponseBody;
  10. import org.springframework.web.bind.annotation.ResponseStatus;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import com.howtodoinjava.demo.model.Employee;
  13. import com.howtodoinjava.demo.service.EmployeeService;
  14. import reactor.core.publisher.Flux;
  15. import reactor.core.publisher.Mono;
  16. @RestController
  17. public class EmployeeController {
  18. @Autowired
  19. private EmployeeService employeeService;
  20. @RequestMapping(value = { "/create", "/" }, method = RequestMethod.POST)
  21. @ResponseStatus(HttpStatus.CREATED)
  22. @ResponseBody
  23. public void create(@RequestBody Employee e) {
  24. employeeService.create(e);
  25. }
  26. @RequestMapping(value = "/{id}", method = RequestMethod.GET)
  27. @ResponseBody
  28. public ResponseEntity<Mono<Employee>> findById(@PathVariable("id") Integer id) {
  29. Mono<Employee> e = employeeService.findById(id);
  30. HttpStatus status = e != null ? HttpStatus.OK : HttpStatus.NOT_FOUND;
  31. return new ResponseEntity<Mono<Employee>>(e, status);
  32. }
  33. @RequestMapping(value = "/name/{name}", method = RequestMethod.GET)
  34. @ResponseBody
  35. public Flux<Employee> findByName(@PathVariable("name") String name) {
  36. return employeeService.findByName(name);
  37. }
  38. @RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  39. @ResponseBody
  40. public Flux<Employee> findAll() {
  41. Flux<Employee> emps = employeeService.findAll();
  42. return emps;
  43. }
  44. @RequestMapping(value = "/update", method = RequestMethod.PUT)
  45. @ResponseStatus(HttpStatus.OK)
  46. public Mono<Employee> update(@RequestBody Employee e) {
  47. return employeeService.update(e);
  48. }
  49. @RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
  50. @ResponseStatus(HttpStatus.OK)
  51. public void delete(@PathVariable("id") Integer id) {
  52. employeeService.delete(id).subscribe();
  53. }
  54. }

4.4. 服务类

IEmployeeService.java

  1. import com.howtodoinjava.demo.model.Employee;
  2. import reactor.core.publisher.Flux;
  3. import reactor.core.publisher.Mono;
  4. public interface IEmployeeService
  5. {
  6. void create(Employee e);
  7. Mono<Employee> findById(Integer id);
  8. Flux<Employee> findByName(String name);
  9. Flux<Employee> findAll();
  10. Mono<Employee> update(Employee e);
  11. Mono<Void> delete(Integer id);
  12. }

EmployeeService.java

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.stereotype.Service;
  3. import com.howtodoinjava.demo.dao.EmployeeRepository;
  4. import com.howtodoinjava.demo.model.Employee;
  5. import reactor.core.publisher.Flux;
  6. import reactor.core.publisher.Mono;
  7. @Service
  8. public class EmployeeService implements IEmployeeService {
  9. @Autowired
  10. EmployeeRepository employeeRepo;
  11. public void create(Employee e) {
  12. employeeRepo.save(e).subscribe();
  13. }
  14. public Mono<Employee> findById(Integer id) {
  15. return employeeRepo.findById(id);
  16. }
  17. public Flux<Employee> findByName(String name) {
  18. return employeeRepo.findByName(name);
  19. }
  20. public Flux<Employee> findAll() {
  21. return employeeRepo.findAll();
  22. }
  23. public Mono<Employee> update(Employee e) {
  24. return employeeRepo.save(e);
  25. }
  26. public Mono<Void> delete(Integer id) {
  27. return employeeRepo.deleteById(id);
  28. }
  29. }

4.5. DAO 存储库

EmployeeRepository.java

  1. import org.springframework.data.mongodb.repository.Query;
  2. import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
  3. import com.howtodoinjava.demo.model.Employee;
  4. import reactor.core.publisher.Flux;
  5. public interface EmployeeRepository extends ReactiveMongoRepository<Employee, Integer> {
  6. @Query("{ 'name': ?0 }")
  7. Flux<Employee> findByName(final String name);
  8. }

4.6. 模型

Employee.java

  1. import org.springframework.context.annotation.Scope;
  2. import org.springframework.context.annotation.ScopedProxyMode;
  3. import org.springframework.data.annotation.Id;
  4. import org.springframework.data.mongodb.core.mapping.Document;
  5. @Scope(scopeName = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
  6. @Document
  7. public class Employee {
  8. @Id
  9. int id;
  10. String name;
  11. long salary;
  12. //Getters and setters
  13. @Override
  14. public String toString() {
  15. return "Employee [id=" + id + ", name=" + name + ", salary=" + salary + "]";
  16. }
  17. }

5. 演示

启动应用程序并检查请求和响应。

  • HTTP POST http://localhost:8080/create

API 请求 1

  1. {
  2. "id":1,
  3. "name":"user_1",
  4. "salary":101
  5. }

API 请求 2

  1. {
  2. "id":2,
  3. "name":"user_2",
  4. "salary":102
  5. }
  • HTTP PUT http://localhost:8080/update

API 请求

  1. {
  2. "id":2,
  3. "name":"user_2",
  4. "salary":103
  5. }
  • HTTP GET http://localhost:8080/

API 响应

  1. data:{"id":1,"name":"user_1","salary":101}
  2. data:{"id":2,"name":"user_2","salary":102}

Spring WebFlux 教程 - 图3

Spring WebFlux 演示

请注意,我正在使用 Postman chrome 浏览器扩展(是阻止客户端)测试 API。 仅当它已经收集了两个员工的响应时,才会显示结果。

要验证非阻塞响应功能,请直接在 chrome 浏览器中点击 URL。 结果将以事件的形式(文本/事件流)一一出现。 为了更好地查看结果,请考虑向控制器 API 添加延迟。

Spring WebFlux 教程 - 图4

Spring WebFlux 示例 – 事件流

6. Spring WebFlux 教程 – 总结

Spring MVC 和 Spring WebFlux 都支持客户端-服务器体系结构,但是并发模型和用于阻止自然和线程的默认行为存在关键差异。 在 Spring MVC 中,假定应用程序可以阻止当前线程,而在 webflux 中,默认情况下线程是非阻止的。 这是 spring webflux 与 mvc 之间的主要区别。

反应和非阻塞通常不会使应用程序运行得更快。 响应式和非阻塞性的预期好处是能够以较少的固定数量的线程和较少的内存需求来扩展应用程序。 它使应用程序在负载下更具弹性,因为它们以更可预测的方式扩展。

请把关于这个 spring boot webflux 教程的问题交给我。

学习愉快!

下载源码