https://cloud.tencent.com/developer/article/1073888 Java 平台反应式编程(Reactive Programming)入门

流式编程

个人理解,流式编程就是 lambda 函数式编程的使用。其中函数式编程,supplier、consumer、function、predicate,派生出的 BiFunction,unaryOperator 等;
stream 流,流的产生,中间计算,结束。
函数式接口 @FunctionalInterface
流式编程、反应式编程 - 图1

  1. package com.demo;
  2. import java.util.Arrays;
  3. import java.util.function.BiConsumer;
  4. import java.util.function.BiFunction;
  5. import java.util.function.BiPredicate;
  6. import java.util.function.BinaryOperator;
  7. import java.util.function.Consumer;
  8. import java.util.function.Function;
  9. import java.util.function.Predicate;
  10. import java.util.function.Supplier;
  11. import java.util.stream.Stream;
  12. public class TestStreamCode {
  13. private static void testFunction() {
  14. Function<String, String> function = i -> "testFunction: 你输入的是:"+i;
  15. System.out.println(function.apply("hello world"));
  16. }
  17. private static void testBiFunction() {
  18. BiFunction<Integer, String, String> function = (i, s) -> String.format("testBiFunction: %s 今年 %d", s, i);
  19. System.out.println(function.apply(30, "xiaohui"));
  20. }
  21. private static void testSuppier() {
  22. Supplier<String> supplier = () -> new String("testSuppier: xiaohui");
  23. }
  24. private static void testConsumer() {
  25. Consumer<String> consumer = (s) -> System.out.println("testConsumer: "+s);
  26. consumer.accept("xiaohui");
  27. }
  28. private static void testBiConsumer() {
  29. BiConsumer<Integer, String> consumer = (i, s) -> System.out.println(String.format("testBiConsumer: %s %d", s,
  30. i));
  31. consumer.accept(100, "xiaohui");
  32. }
  33. private static void testPredicate() {
  34. Predicate<String> predicate = s -> s.isBlank();
  35. System.out.println("testPredicate: "+ predicate.test("xiaohui"));
  36. }
  37. private static void testBiPredicate() {
  38. BiPredicate<Integer, String> predicate = (i, s) -> i > 0 && s.isBlank();
  39. System.out.println(String.format("testBiPredicate: %s, %d", "xiaohui", 100));
  40. }
  41. private static void testBinaryOperator() {
  42. // public interface BinaryOperator<T> extends BiFunction<T,T,T>
  43. BinaryOperator<String> operator = (s1,s2) -> s1 + s2;
  44. System.out.println("testBinaryOperator: " + operator.apply("hello", "world"));
  45. }
  46. private static void testStream() {
  47. Arrays.stream(new String[]{"hello", "hi", "嘿嘿"}).forEach(System.out::println);
  48. Arrays.asList("哈哈", "呵呵").stream().forEach(System.out::println);
  49. Stream.iterate(1, i->i+1).limit(10).forEach(System.out::print);
  50. Stream.of("xiaohui".split("")).forEach(System.out::println);
  51. Stream.generate(()->"xiaohui").limit(9).forEach(System.out::println);
  52. }
  53. public static void main(String[] args) {
  54. testBiConsumer();
  55. testBiFunction();
  56. testBinaryOperator();
  57. testBiPredicate();
  58. testConsumer();
  59. testFunction();
  60. testPredicate();
  61. testSuppier();
  62. ////////////////////////////
  63. testStream();
  64. }
  65. }

反应式编程

project reactor
flux、mono
流式编程、反应式编程 - 图2
流的建立、操作、结束
mono、flux
流式编程、反应式编程 - 图3

基于流式的restful web请求(基于注解,基于routerFunction)

  1. package com.demo;
  2. import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
  3. import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
  4. import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
  5. import static org.springframework.web.reactive.function.server.RouterFunctions.route;
  6. import org.springframework.boot.SpringApplication;
  7. import org.springframework.boot.autoconfigure.SpringBootApplication;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.http.MediaType;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.web.bind.annotation.GetMapping;
  12. import org.springframework.web.bind.annotation.RequestMapping;
  13. import org.springframework.web.bind.annotation.RestController;
  14. import org.springframework.web.reactive.function.server.RouterFunction;
  15. import org.springframework.web.reactive.function.server.ServerRequest;
  16. import org.springframework.web.reactive.function.server.ServerResponse;
  17. import reactor.core.publisher.Mono;
  18. @SpringBootApplication
  19. public class TestWebFlux {
  20. public static void main(String[] args) {
  21. SpringApplication.run(TestWebFlux.class, args);
  22. }
  23. @Bean
  24. public RouterFunction<ServerResponse> monoRouterFunction(UserHandler userHandler) {
  25. return route(GET("/{user}").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUser)
  26. .andRoute(GET("/{user}/customers").and(accept(MediaType.APPLICATION_JSON)),
  27. userHandler::getUserCustomers)
  28. .andRoute(DELETE("/{user}").and(accept(MediaType.APPLICATION_JSON)), userHandler::deleteUser);
  29. }
  30. @Bean
  31. public RouterFunction<ServerResponse> functionResponse() {
  32. return route().GET("/function/greeting", request -> ServerResponse.ok().bodyValue("function greeting")).build();
  33. }
  34. }
  35. @RestController
  36. @RequestMapping("/annotated")
  37. class TestAnnotated {
  38. @GetMapping("/greeting")
  39. public Mono<String> greeting() {
  40. return Mono.just("hello greeting by annotated.");
  41. }
  42. }
  43. @Component
  44. class UserHandler {
  45. public Mono<ServerResponse> getUser(ServerRequest request) {
  46. // ...
  47. // Flux.just(1).publishOn(Schedulers.immediate())
  48. return ServerResponse.ok().bodyValue("getUser");
  49. }
  50. public Mono<ServerResponse> getUserCustomers(ServerRequest request) {
  51. // ...
  52. return ServerResponse.ok().bodyValue("getUserCustomers");
  53. }
  54. public Mono<ServerResponse> deleteUser(ServerRequest request) {
  55. // ...
  56. return ServerResponse.ok().bodyValue("deleteUser");
  57. }
  58. }
  59. /*
  60. <dependency>
  61. <groupId>org.springframework.boot</groupId>
  62. <artifactId>spring-boot-starter-webflux</artifactId>
  63. </dependency>
  64. */
  65. /*
  66. 2021-04-15 17:31:03.250 INFO 11924 --- [ main] com.demo.TestWebFlux : Starting TestWebFlux using Java 15.0.2 on 220200700182 with PID 11924 (D:\work\task\my-activiti-demo\scala-second\target\classes started by DELL in D:\work\task\my-activiti-demo\scala-second)
  67. 2021-04-15 17:31:03.253 INFO 11924 --- [ main] com.demo.TestWebFlux : No active profile set, falling back to default profiles: default
  68. 2021-04-15 17:31:04.200 INFO 11924 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
  69. 2021-04-15 17:31:04.219 INFO 11924 --- [ main] com.demo.TestWebFlux : Started TestWebFlux in 1.206 seconds (JVM running for 1.684)
  70. */