简介

Reactive(反应式)与Imperative(命令式)相对应, Reactive一般都关联着back-pressure, monads, or event-driven architecture等词汇,也往往表现出一些特点。

  • Responsive - they must respond in a timely fashion(及时响应)
  • Elastic - they adapt themselves to the fluctuating load(弹性伸缩)
  • Resilient - they handle failures gracefully(优雅的处理失败)
  • Asynchronous message passing - the component of a reactive system interact using messages(消息交互)

更多内容:https://principles.reactive.foundation

Reactive 反应式并不是Java所特有,是一种应对高并发、快速响应编程的一系列原则,Spring 5提供了全新的Spring WebFlux框架,相比于传统的Spring MVC,实现了Reactive Streams规范,不需要Servlet API的支持,创建基于事件循环执行模型的完全异步且非阻塞的应用程序。
Quarkus同样提供了对Reactive编程的支持。这些原则在使用资源(CPU和内存)的同时更有效地处理比传统方法更多的负载,同时也会优雅地反应失败。Quarkus相比于Spring WebFlux的优势在于你不用纠结如何选择,可以在同一个应用程序中同时使用反应式和非反应式编程,不需要依赖第三方应用或者技术栈。

Quarkus可以作为反应式和非反应式应用的桥梁。

Quarkus如何实现反应式,借用官方描述的一张图片,通过Eclipse Vert.x and Netty等构建了一个特有的引擎,来处理non-blocking I/O交互。Quarkus或者应用可以通过代码来编排数据库、消息队列等I/O事件交互。
quarkus-reactive-core.png

创建应用

同创建Quarkus Web应用类似,创建反应式应用只需要选择相应的反应式组件即可。

  • Hibernate Reactive with Panache
  • RESTEasy Reactive
  • Reactive MySQL client

pom.xml对应的依赖

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>io.quarkus</groupId>
  7. <artifactId>quarkus-hibernate-reactive-panache</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>io.quarkus</groupId>
  11. <artifactId>quarkus-reactive-mysql-client</artifactId>
  12. </dependency>

创建对应的实体类

  1. package com.starsray.entity;
  2. import io.quarkus.hibernate.reactive.panache.PanacheEntity;
  3. import javax.persistence.Column;
  4. import javax.persistence.Entity;
  5. import javax.persistence.Lob;
  6. import javax.persistence.Table;
  7. import java.time.LocalDate;
  8. @Entity
  9. @Table(name = "employee")
  10. public class Employee extends PanacheEntity {
  11. @Column(name = "emp_no", nullable = false)
  12. private Integer empNo;
  13. @Column(name = "birth_date", nullable = false)
  14. private LocalDate birthDate;
  15. @Column(name = "first_name", nullable = false, length = 14)
  16. private String firstName;
  17. @Column(name = "last_name", nullable = false, length = 16)
  18. private String lastName;
  19. @Lob
  20. @Column(name = "gender", nullable = false)
  21. private String gender;
  22. @Column(name = "hire_date", nullable = false)
  23. private LocalDate hireDate;
  24. public LocalDate getHireDate() {
  25. return hireDate;
  26. }
  27. public void setHireDate(LocalDate hireDate) {
  28. this.hireDate = hireDate;
  29. }
  30. public String getGender() {
  31. return gender;
  32. }
  33. public void setGender(String gender) {
  34. this.gender = gender;
  35. }
  36. public String getLastName() {
  37. return lastName;
  38. }
  39. public void setLastName(String lastName) {
  40. this.lastName = lastName;
  41. }
  42. public String getFirstName() {
  43. return firstName;
  44. }
  45. public void setFirstName(String firstName) {
  46. this.firstName = firstName;
  47. }
  48. public LocalDate getBirthDate() {
  49. return birthDate;
  50. }
  51. public void setBirthDate(LocalDate birthDate) {
  52. this.birthDate = birthDate;
  53. }
  54. public Integer getEmpNo() {
  55. return empNo;
  56. }
  57. public void setEmpNo(Integer empNo) {
  58. this.empNo = empNo;
  59. }
  60. }

创建实体类对应的Resource,里面包含一个Reactive API和普通API。

  1. package com.starsray;
  2. import com.starsray.entity.Employee;
  3. import io.quarkus.hibernate.reactive.panache.Panache;
  4. import io.smallrye.mutiny.Uni;
  5. import javax.enterprise.context.ApplicationScoped;
  6. import javax.ws.rs.*;
  7. import javax.ws.rs.core.Response;
  8. import java.net.URI;
  9. import java.util.List;
  10. @ApplicationScoped
  11. @Path("employee")
  12. public class EmployeeReactiveResource {
  13. @GET
  14. @Path("/{empNo}")
  15. @Produces
  16. public Uni<List<Employee>> get(@PathParam("empNo")int empNo) {
  17. return Employee.list("id", 1L);
  18. }
  19. @POST
  20. @Path("create")
  21. @Produces
  22. @Consumes
  23. public Uni<Response> create(Employee Employee) {
  24. return Panache.<Employee>withTransaction(Employee::persist)
  25. .onItem()
  26. .transform(inserted -> Response.created(URI.create("/Employees/" + inserted.id))
  27. .build());
  28. }
  29. @GET
  30. @Path("/{id}")
  31. @Produces
  32. public Uni<Employee> getSingle(@PathParam("id") Long id) {
  33. return Employee.findById(id);
  34. }
  35. }

配置数据库连接

  1. quarkus.datasource.db-kind=mysql
  2. quarkus.datasource.username=root
  3. quarkus.datasource.password=root
  4. quarkus.datasource.reactive.url=mysql://localhost:3306/employees
  5. quarkus.hibernate-orm.log.format-sql=true
  6. quarkus.hibernate-orm.log.sql=true

启动项目

  1. ./mvnw quarkus:dev

在体验上几乎看不出来区别。Quarkus底层都已经实现了。

Reactive

由于Reactive通过一种异步非阻塞式I/O和数据库进行交互,因此需要一种异步HTTP实现结构体,Quarkus使用Mutiny 作为其核心的反应式编程模块,因此HTTP请求支持返回两种Mutiny类型(Uni and Multi),在引入的Hibernate Reactive with Panache模块中只需要实体类继承PanacheEntity类,就可以暴漏并使用这两种返回类型。没有特殊指定。RESTEasy Reactive会默认返回List对象为JSONArray。

  • GET

注意查看这一段代码,返回类型是通过Uni来包装的实体Value,没有直接返回结果集。当数据库读取到相关数据后,Uni会获取并返回结果。

  1. @GET
  2. @Path("/{empNo}")
  3. @Produces
  4. public Uni<List<Employee>> get(@PathParam("empNo") int empNo) {
  5. return Employee.list("id", 1L);
  6. }

Uni<?>是一种异步返回类型,有点类似future,Uni作为一个占位符等待结果(暂称为Item)返回,当接收到Mutiny分发的Item时,开发人员可以进行一些业务逻辑的处理,也可以表达为Reactive的延续性(相对于传统命令式阻塞I/O的顺序性),体现在获取一个Uni,当Uni占位符得到分发的Item时,执行其他过程。

为什么返回类型是通过Uni来返回Item,通过关系型数据库查询,直接返回List结果集并不是一种好的方式,传统关系型数据库并不能很好的处理流式结果,也并没有相关设计协议。通过流来返回数据库结果,需要保持连接或者开启一个事务直到所有的查询行被消费,如果有一个消费者消费的比较慢就会违背数据库操作的黄金法则(不要保持一个太久的数据库连接)。一般来说,过低的数据库连接数和较长的数据库连接会明显的降低应用的并发性和效应效率,所以这里推荐使用Uni<?>来包装返回结果,如果数据量过大,可以考虑使用分页来解决问题。

  • POST

查看添加的POST请求代码,根据JAX-RS的规范没有提供一种类似于Spring @RequestBody的注解,默认会以JSON类型接收POST请求。

  1. @POST
  2. @Path("create")
  3. public Uni<Response> create(Employee Employee) {
  4. return Panache.<Employee>withTransaction(Employee::persist)
  5. .onItem()
  6. .transform(inserted -> Response.created(URI.create("/Employees/" + inserted.id))
  7. .build());
  8. }

为了对数据库进行写操作,需要开启一个数据库事务,Panache.withTransaction会异步的去获取一个事务,当接收到transaction时会调用persist方法,同样persist会返回一个异步Uni结果,Uni会分发Employee在数据库的插入结果,当插入动作完成时(我们编码的延续),我们创建一个201 CREATED响应。在等待接收事务的过程中,RESTEasy Reactive会自动的读取请求体为JSON并且创建一个Employee实体对象。

命令式与反应式

参考官方文档上面的例子简单的示例了反应式编程,在使用上几乎和传统命令式编程没有区别,你可能会疑惑反应式编程和命令式编程有哪些不同或者有哪些好处。为了更好的理解和对比,我们需要先了解反应式和命令式在执行模型上的不同,理解执行模型的是理解Reactive的必要前提。

  • blocking I/O

在传统的命令式编程中,依赖阻塞式I/O模型,框架会分配一个线程去处理一个请求,请求的整个过程都在这个线程上进行,这种模型非常不适合大规模扩展,为了处理大量的请求就需要大量的线程,应用的并发性能会受限于工作线程的数量,当需要与远程服务进行交互调用时候,这些线程就会被阻塞。并且每个线程都映射到 OS 线程,因此在内存和 CPU 方面都有成本,大大降低了资源的利用率。
blocking-threads.png
blocking I/O

  • non-blocking I/O

反应式编程依赖非阻塞式I/O并且使用不同的执行模型,non-blocking I/O提供了一种高效的方式来处理并发I/O,很小一部分I/O线程可以处理大量的并发I/O,通过这种模型,处理请求不会委托给工作线程,而是直接使用这些 I/O 线程。这种模型节省了内存和 CPU,因为不需要创建工作线程来处理请求,提高了并发性,并且消除了对线程数量的限制,由于减少了线程切换的数量,它还提高了响应时间。
reactive-thread.png

  • 顺序性到延续性

命令式体现在顺序性,反应式体现在延续性,是两种本质风格的转变,二者在模型上最大的差别体现在,反应式编程的请求是由I/O线程来处理的,少量的线程即可处理大量的并发请求。
延续性风格编码在处理请求过程需要与远程服务(如 HTTP API 或数据库)交互时,它不会阻塞执行等待响应结果返回,相反,它会调度 I/O 操作并延续处理请求的剩余代码。 这种延续可以作为回调(使用 I/O 结果调用的函数)传递,或者使用更高级的结构体,例如反应式编程或协程。 不管延续如何表达,重要的是对I/O 线程的释放,因此该线程可用于处理另一个请求。 当调度的 I/O 完成时,I/O 线程执行继续,并且继续处理挂起的请求。
因此,与 I/O 阻塞执行的命令式模型不同,反应式切换到基于延续的设计,其中 I/O 线程被释放,并在 I/O 完成时调用延续。 因此,I/O 线程可以处理多个并发请求,从而提高应用程序的整体并发性。

Quarkus提供了不同的反应式编程库:

  • Mutiny - an intuitive and event-driven reactive programming library
  • Kotlin co-routines - a way to write asynchronous code in a sequential manner

参考文档:

更多资料: