在Spring Boot 2.0 WebFlux编程 一节我们大致了解了WebFlux的用法,这节我们将结合Mongo DB在WebFlux的架构下实现增删改查样例。
和Spring Boot整合Mongo DB不同的是,我们使用的是Reactive Mongo DB依赖,所有增删改查方法返回值类型为Flux或者Mono。
项目准备
新建一个Spring Boot项目,版本为2.1.3.RELEASE,并引入webflux和reactive mongodb依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId></dependency>
要开启Reactive Mongo DB的相关配置,需要在Spring Boot启动类上添加@EnableReactiveMongoRepositories注解:
@SpringBootApplication@EnableReactiveMongoRepositoriespublic class WebfluxApplication {public static void main(String[] args) {SpringApplication.run(WebfluxApplication.class, args);}}
接着在配置文件application.yml里配置Mongo DB连接:
spring:data:mongodb:host: localhostport: 27017database: webflux
使用的是webflux数据库,所以需要在Mongo DB里新建一个webflux数据库(并创建user文档/表,以供待会使用):

创建User实体类:
@Document(collection = "user")public class User {@Idprivate String id;private String name;private Integer age;private String description;// get set 略}
简单增删改查
创建UserDao接口,继承自ReactiveMongoRepository:
@Repositorypublic interface UserDao extends ReactiveMongoRepository<User, String> {}
和 Spring Boot整合Mongo DB 不同的是,我们继承的是ReactiveMongoRepository而非MongoRepository,它所提供的方法都是响应式的:

在UserService里通过UserDao定义简单增删改查方法:
@Servicepublic class UserService {@Autowiredprivate UserDao userDao;public Flux<User> getUsers() {return userDao.findAll();}public Mono<User> getUser(String id) {return this.userDao.findById(id);}public Mono<User> createUser(User user) {return userDao.save(user);}public Mono<Void> deleteUser(String id) {return this.userDao.findById(id).flatMap(user -> this.userDao.delete(user));}public Mono<User> updateUser(String id, User user) {return this.userDao.findById(id).flatMap(u -> {u.setName(user.getName());u.setAge(user.getAge());u.setDescription(user.getDescription());return this.userDao.save(u);});}}
大致上和Spring Boot整合Mongo DB中的UserService差不多,不同的是返回值类型为Flux或者Mono,即它们是响应式非阻塞的方法。
编写RESTfulUserController:
@RestController@RequestMapping("user")public class UserController {@Autowiredprivate UserService userService;/*** 以数组的形式一次性返回所有数据*/@GetMappingpublic Flux<User> getUsers() {return userService.getUsers();}/*** 以 Server sent events形式多次返回数据*/@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<User> getUsersStream() {return userService.getUsers();}@PostMappingpublic Mono<User> createUser(User user) {return userService.createUser(user);}/*** 存在返回 200,不存在返回 404*/@DeleteMapping("/{id}")public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {return userService.deleteUser(id).then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));}/*** 存在返回修改后的 User* 不存在返回 404*/@PutMapping("/{id}")public Mono<ResponseEntity<User>> updateUser(@PathVariable String id, User user) {return userService.updateUser(id, user).map(u -> new ResponseEntity<>(u, HttpStatus.OK)).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));}/*** 根据用户 id查找* 存在返回,不存在返回 404*/@GetMapping("/{id}")public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {return userService.getUser(id).map(user -> new ResponseEntity<>(user, HttpStatus.OK)).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));}}
对于返回值为Flux<T>类型的方法,推荐定义两个一样的方法,一个以普通形式返回,一个以Server Sent Event的形式返回。对于修改和删除,如果需要修改和删除的用户不存在,我们返回404。
对于Flux和Mono的操作,在 Spring Boot 2.0 WebFlux编程 一节中已经介绍过了,这里就不再赘述了。
排序与分页
在 Spring Boot整合Mongo DB 一节中,我们通过MongoTemplate实现了排序与分页。与MongoTemplate对于的响应式的对象为ReactiveMongoTemplate,所以我们照葫芦画瓢,仿照MongoTemplate的写法来实现:
/*** 分页查询,只返回分页后的数据,count值需要通过 getUserByConditionCount* 方法获取*/public Flux<User> getUserByCondition(int size, int page, User user) {Query query = getQuery(user);Sort sort = new Sort(Sort.Direction.DESC, "age");Pageable pageable = PageRequest.of(page, size, sort);return template.find(query.with(pageable), User.class);}/*** 返回 count,配合 getUserByCondition使用*/public Mono<Long> getUserByConditionCount(User user) {Query query = getQuery(user);return template.count(query, User.class);}private Query getQuery(User user) {Query query = new Query();Criteria criteria = new Criteria();if (!StringUtils.isEmpty(user.getName())) {criteria.and("name").is(user.getName());}if (!StringUtils.isEmpty(user.getDescription())) {criteria.and("description").regex(user.getDescription());}query.addCriteria(criteria);return query;}
之所以拆分是因为没找到与PageableExecutionUtils类的getPage方法类似的方法,如果是响应式的话,返回值类型应该是Mono<Page<User>>。
更好的实现方法可以试试用zip:
public Mono<Page<ChatUser>> findByChannelIdPageable(String channelId, Integer page, Integer size) {Pageable pageable = PageRequest.of(page, size, Sort.by(Sort.Direction.DESC, "chatChannels.joinedTime"));Criteria criteria = new Criteria("chatChannels.chatChannelId").is(channelId);Query query = new Query().with(pageable);query.addCriteria(criteria);Flux<ChatUser> chatUserFlux = reactiveMongoTemplate.find(query, ChatUser.class, "chatUser");Mono<Long> countMono = reactiveMongoTemplate.count(Query.of(query).limit(-1).skip(-1), ChatUser.class);return Mono.zip(chatUserFlux.collectList(),countMono).map(tuple2 -> {return PageableExecutionUtils.getPage(tuple2.getT1(),pageable,() -> tuple2.getT2());});}
源码和PostMan测试样例链接:https://github.com/wuyouzhuguli/SpringAll/tree/master/58.Spring-Boot-WebFlux-crud
