Spring Boot 2.0 WebFlux编程 一节我们大致了解了WebFlux的用法,这节我们将结合Mongo DB在WebFlux的架构下实现增删改查样例。

Spring Boot整合Mongo DB不同的是,我们使用的是Reactive Mongo DB依赖,所有增删改查方法返回值类型为Flux或者Mono。

项目准备

新建一个Spring Boot项目,版本为2.1.3.RELEASE,并引入webfluxreactive mongodb依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-webflux</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
  8. </dependency>

要开启Reactive Mongo DB的相关配置,需要在Spring Boot启动类上添加@EnableReactiveMongoRepositories注解:

  1. @SpringBootApplication
  2. @EnableReactiveMongoRepositories
  3. public class WebfluxApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(WebfluxApplication.class, args);
  6. }
  7. }

接着在配置文件application.yml里配置Mongo DB连接:

  1. spring:
  2. data:
  3. mongodb:
  4. host: localhost
  5. port: 27017
  6. database: webflux

使用的是webflux数据库,所以需要在Mongo DB里新建一个webflux数据库(并创建user文档/表,以供待会使用):

Spring Boot WebFlux增删改查样例 - 图1

创建User实体类:

  1. @Document(collection = "user")
  2. public class User {
  3. @Id
  4. private String id;
  5. private String name;
  6. private Integer age;
  7. private String description;
  8. // get set 略
  9. }

简单增删改查

创建UserDao接口,继承自ReactiveMongoRepository

  1. @Repository
  2. public interface UserDao extends ReactiveMongoRepository<User, String> {
  3. }

Spring Boot整合Mongo DB 不同的是,我们继承的是ReactiveMongoRepository而非MongoRepository,它所提供的方法都是响应式的:

Spring Boot WebFlux增删改查样例 - 图2

UserService里通过UserDao定义简单增删改查方法:

  1. @Service
  2. public class UserService {
  3. @Autowired
  4. private UserDao userDao;
  5. public Flux<User> getUsers() {
  6. return userDao.findAll();
  7. }
  8. public Mono<User> getUser(String id) {
  9. return this.userDao.findById(id);
  10. }
  11. public Mono<User> createUser(User user) {
  12. return userDao.save(user);
  13. }
  14. public Mono<Void> deleteUser(String id) {
  15. return this.userDao.findById(id)
  16. .flatMap(user -> this.userDao.delete(user));
  17. }
  18. public Mono<User> updateUser(String id, User user) {
  19. return this.userDao.findById(id)
  20. .flatMap(u -> {
  21. u.setName(user.getName());
  22. u.setAge(user.getAge());
  23. u.setDescription(user.getDescription());
  24. return this.userDao.save(u);
  25. });
  26. }
  27. }

大致上和Spring Boot整合Mongo DB中的UserService差不多,不同的是返回值类型为Flux或者Mono,即它们是响应式非阻塞的方法。

编写RESTfulUserController

  1. @RestController
  2. @RequestMapping("user")
  3. public class UserController {
  4. @Autowired
  5. private UserService userService;
  6. /**
  7. * 以数组的形式一次性返回所有数据
  8. */
  9. @GetMapping
  10. public Flux<User> getUsers() {
  11. return userService.getUsers();
  12. }
  13. /**
  14. * 以 Server sent events形式多次返回数据
  15. */
  16. @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  17. public Flux<User> getUsersStream() {
  18. return userService.getUsers();
  19. }
  20. @PostMapping
  21. public Mono<User> createUser(User user) {
  22. return userService.createUser(user);
  23. }
  24. /**
  25. * 存在返回 200,不存在返回 404
  26. */
  27. @DeleteMapping("/{id}")
  28. public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {
  29. return userService.deleteUser(id)
  30. .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
  31. .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  32. }
  33. /**
  34. * 存在返回修改后的 User
  35. * 不存在返回 404
  36. */
  37. @PutMapping("/{id}")
  38. public Mono<ResponseEntity<User>> updateUser(@PathVariable String id, User user) {
  39. return userService.updateUser(id, user)
  40. .map(u -> new ResponseEntity<>(u, HttpStatus.OK))
  41. .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  42. }
  43. /**
  44. * 根据用户 id查找
  45. * 存在返回,不存在返回 404
  46. */
  47. @GetMapping("/{id}")
  48. public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
  49. return userService.getUser(id)
  50. .map(user -> new ResponseEntity<>(user, HttpStatus.OK))
  51. .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  52. }
  53. }

对于返回值为Flux<T>类型的方法,推荐定义两个一样的方法,一个以普通形式返回,一个以Server Sent Event的形式返回。对于修改和删除,如果需要修改和删除的用户不存在,我们返回404。

对于Flux和Mono的操作,在 Spring Boot 2.0 WebFlux编程 一节中已经介绍过了,这里就不再赘述了。

排序与分页

Spring Boot整合Mongo DB 一节中,我们通过MongoTemplate实现了排序与分页。与MongoTemplate对于的响应式的对象为ReactiveMongoTemplate,所以我们照葫芦画瓢,仿照MongoTemplate的写法来实现:

  1. /**
  2. * 分页查询,只返回分页后的数据,count值需要通过 getUserByConditionCount
  3. * 方法获取
  4. */
  5. public Flux<User> getUserByCondition(int size, int page, User user) {
  6. Query query = getQuery(user);
  7. Sort sort = new Sort(Sort.Direction.DESC, "age");
  8. Pageable pageable = PageRequest.of(page, size, sort);
  9. return template.find(query.with(pageable), User.class);
  10. }
  11. /**
  12. * 返回 count,配合 getUserByCondition使用
  13. */
  14. public Mono<Long> getUserByConditionCount(User user) {
  15. Query query = getQuery(user);
  16. return template.count(query, User.class);
  17. }
  18. private Query getQuery(User user) {
  19. Query query = new Query();
  20. Criteria criteria = new Criteria();
  21. if (!StringUtils.isEmpty(user.getName())) {
  22. criteria.and("name").is(user.getName());
  23. }
  24. if (!StringUtils.isEmpty(user.getDescription())) {
  25. criteria.and("description").regex(user.getDescription());
  26. }
  27. query.addCriteria(criteria);
  28. return query;
  29. }

之所以拆分是因为没找到与PageableExecutionUtils类的getPage方法类似的方法,如果是响应式的话,返回值类型应该是Mono<Page<User>>

更好的实现方法可以试试用zip:

  1. public Mono<Page<ChatUser>> findByChannelIdPageable(String channelId, Integer page, Integer size) {
  2. Pageable pageable = PageRequest.of(page, size, Sort.by(Sort.Direction.DESC, "chatChannels.joinedTime"));
  3. Criteria criteria = new Criteria("chatChannels.chatChannelId").is(channelId);
  4. Query query = new Query().with(pageable);
  5. query.addCriteria(criteria);
  6. Flux<ChatUser> chatUserFlux = reactiveMongoTemplate.find(query, ChatUser.class, "chatUser");
  7. Mono<Long> countMono = reactiveMongoTemplate.count(Query.of(query).limit(-1).skip(-1), ChatUser.class);
  8. return Mono.zip(chatUserFlux.collectList(),countMono).map(tuple2 -> {
  9. return PageableExecutionUtils.getPage(
  10. tuple2.getT1(),
  11. pageable,
  12. () -> tuple2.getT2());
  13. });
  14. }

源码和PostMan测试样例链接:https://github.com/wuyouzhuguli/SpringAll/tree/master/58.Spring-Boot-WebFlux-crud