https://cloud.tencent.com/developer/article/1073888 Java 平台反应式编程(Reactive Programming)入门
流式编程
个人理解,流式编程就是 lambda 函数式编程的使用。其中函数式编程,supplier、consumer、function、predicate,派生出的 BiFunction,unaryOperator 等;
stream 流,流的产生,中间计算,结束。
函数式接口 @FunctionalInterface
package com.demo;
import java.util.Arrays;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
public class TestStreamCode {
private static void testFunction() {
Function<String, String> function = i -> "testFunction: 你输入的是:"+i;
System.out.println(function.apply("hello world"));
}
private static void testBiFunction() {
BiFunction<Integer, String, String> function = (i, s) -> String.format("testBiFunction: %s 今年 %d", s, i);
System.out.println(function.apply(30, "xiaohui"));
}
private static void testSuppier() {
Supplier<String> supplier = () -> new String("testSuppier: xiaohui");
}
private static void testConsumer() {
Consumer<String> consumer = (s) -> System.out.println("testConsumer: "+s);
consumer.accept("xiaohui");
}
private static void testBiConsumer() {
BiConsumer<Integer, String> consumer = (i, s) -> System.out.println(String.format("testBiConsumer: %s %d", s,
i));
consumer.accept(100, "xiaohui");
}
private static void testPredicate() {
Predicate<String> predicate = s -> s.isBlank();
System.out.println("testPredicate: "+ predicate.test("xiaohui"));
}
private static void testBiPredicate() {
BiPredicate<Integer, String> predicate = (i, s) -> i > 0 && s.isBlank();
System.out.println(String.format("testBiPredicate: %s, %d", "xiaohui", 100));
}
private static void testBinaryOperator() {
// public interface BinaryOperator<T> extends BiFunction<T,T,T>
BinaryOperator<String> operator = (s1,s2) -> s1 + s2;
System.out.println("testBinaryOperator: " + operator.apply("hello", "world"));
}
private static void testStream() {
Arrays.stream(new String[]{"hello", "hi", "嘿嘿"}).forEach(System.out::println);
Arrays.asList("哈哈", "呵呵").stream().forEach(System.out::println);
Stream.iterate(1, i->i+1).limit(10).forEach(System.out::print);
Stream.of("xiaohui".split("")).forEach(System.out::println);
Stream.generate(()->"xiaohui").limit(9).forEach(System.out::println);
}
public static void main(String[] args) {
testBiConsumer();
testBiFunction();
testBinaryOperator();
testBiPredicate();
testConsumer();
testFunction();
testPredicate();
testSuppier();
////////////////////////////
testStream();
}
}
反应式编程
project reactor
flux、mono
流的建立、操作、结束
mono、flux
基于流式的restful web请求(基于注解,基于routerFunction)
package com.demo;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@SpringBootApplication
public class TestWebFlux {
public static void main(String[] args) {
SpringApplication.run(TestWebFlux.class, args);
}
@Bean
public RouterFunction<ServerResponse> monoRouterFunction(UserHandler userHandler) {
return route(GET("/{user}").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUser)
.andRoute(GET("/{user}/customers").and(accept(MediaType.APPLICATION_JSON)),
userHandler::getUserCustomers)
.andRoute(DELETE("/{user}").and(accept(MediaType.APPLICATION_JSON)), userHandler::deleteUser);
}
@Bean
public RouterFunction<ServerResponse> functionResponse() {
return route().GET("/function/greeting", request -> ServerResponse.ok().bodyValue("function greeting")).build();
}
}
@RestController
@RequestMapping("/annotated")
class TestAnnotated {
@GetMapping("/greeting")
public Mono<String> greeting() {
return Mono.just("hello greeting by annotated.");
}
}
@Component
class UserHandler {
public Mono<ServerResponse> getUser(ServerRequest request) {
// ...
// Flux.just(1).publishOn(Schedulers.immediate())
return ServerResponse.ok().bodyValue("getUser");
}
public Mono<ServerResponse> getUserCustomers(ServerRequest request) {
// ...
return ServerResponse.ok().bodyValue("getUserCustomers");
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
// ...
return ServerResponse.ok().bodyValue("deleteUser");
}
}
/*
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
*/
/*
2021-04-15 17:31:03.250 INFO 11924 --- [ main] com.demo.TestWebFlux : Starting TestWebFlux using Java 15.0.2 on 220200700182 with PID 11924 (D:\work\task\my-activiti-demo\scala-second\target\classes started by DELL in D:\work\task\my-activiti-demo\scala-second)
2021-04-15 17:31:03.253 INFO 11924 --- [ main] com.demo.TestWebFlux : No active profile set, falling back to default profiles: default
2021-04-15 17:31:04.200 INFO 11924 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2021-04-15 17:31:04.219 INFO 11924 --- [ main] com.demo.TestWebFlux : Started TestWebFlux in 1.206 seconds (JVM running for 1.684)
*/