jdk9 flow api(reactive stream)
publisher、subscription、processor
它与函数式编程是两回事,各是各的概念。反应式编程是基于发布订阅,存在回压(背压),来感知客户端、服务端的压力(订阅者与发布者的互动)。
flow 是jdk9 对 reactive stream 的官方实现,spring5中webflux是另一种实现。
jdk9 中flow api:
demo,测试发布订阅
package com.demo;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;import java.util.concurrent.SubmissionPublisher;import java.util.concurrent.TimeUnit;/*** 测试jdk9 flow (reactive*/public class TestJdkFlow {public static void main(String[] args) throws InterruptedException {// 1. 定义发布者// 直接使用jdk自带的 submissionPublisher,它实现了 publisher 接口SubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 定义发布者Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println(">>>>>>>>>>> onSubscribe");// 保存订阅关系,需要使用它来给发布者响应this.subscription = subscription;// 请求一个数据subscription.request(1);}@Overridepublic void onNext(String item) {// 接收到一个数据,处理System.out.println(">>>>>>>>>>> onNext:" + item);// 处理完事,告诉发布者我还要请求数据subscription.request(1);// 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了// subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现异常,处理System.out.println(">>>>>>>>>>> onError");// 可以告诉发布者,自己不需要接收收据了subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完成了(发布者关闭了)System.out.println(">>>>>>>>>>> onComplete");}};// 3. 发布者与订阅者建立订阅关系publisher.subscribe(subscriber);// 4. 生产数据,并发布数据// 模拟数据String str = "小辉又犯傻了。。。。";publisher.submit(str);publisher.submit(str+"...");// 5. 结束后,关闭发布者publisher.close();// Thread.currentThread().join(100000);// 线程保留,等待执行完成TimeUnit.SECONDS.sleep(10);}}
demo,测试发布者、处理器、订阅者式的发布订阅
package com.demo;import java.util.Objects;import java.util.concurrent.Flow.Processor;import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;import java.util.concurrent.SubmissionPublisher;import java.util.concurrent.TimeUnit;/*** 使用处理器,测试反应式编程:* 相当于 发布者->数据处理器->订阅者*/public class TestJDKFlowProcessorDemo {public static void main(String[] args) throws InterruptedException {// 1. 发布者SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();// 2. 定义处理器,对数据进行过滤,并转换为 String 类型TestJDKFlowProcessor processor = new TestJDKFlowProcessor();// 3. 发布者和处理器建立订阅关系publisher.subscribe(processor);// 4. 定义最终的订阅者,消费String数据Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println(">>>>>>>>>>>subscriber onSubscribe");// 保存订阅关系,需要使用它来给发布者响应this.subscription = subscription;// 请求一个数据subscription.request(1);}@Overridepublic void onNext(String item) {// 接收到一个数据,处理System.out.println(">>>>>>>>>>>subscriber onNext:" + item);// 处理完事,告诉发布者我还要请求数据subscription.request(1);// 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了// subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现异常,处理System.out.println(">>>>>>>>>>>subscriber onError");// 可以告诉发布者,自己不需要接收收据了subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完成了(发布者关闭了)System.out.println(">>>>>>>>>>>subscriber onComplete");}};// 5. 处理器和最终订阅者建立订阅关系processor.subscribe(subscriber);// 6. 生产数据,并发布publisher.submit(30);publisher.submit(32);publisher.submit(35);publisher.submit(null); // java.lang.NullPointerException,不可发布空对象数据// 7. 关闭发布者publisher.close();TimeUnit.SECONDS.sleep(10);}}/*** Processor 需要继承SubmissionPublisher,并实现 Processor* 数据处理(相当于一个数据中转站)** 输入源数据 Integer,过滤掉小于0的,然后转换为String发布出去*/class TestJDKFlowProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onSubscribe");// 保存订阅关系,需要使用它来给发布者响应this.subscription = subscription;// 向发布者请求一个资源subscription.request(1);}@Overridepublic void onNext(Integer item) {System.out.println(">>>>>>>>>>>>>>>>>>TestJDKFlowProcessor onNext");// 收到一个数据System.out.println(">>>>>>>> 我收到了:" + item);// 处理数据if (Objects.isNull(item)) {this.submit("你都不告诉我,你的年纪,是年纪一大把了吗");} else if (item < 18) {this.submit("小女子芳年二八,待字闺中人未识哦");} else {this.submit("英雄宝刀未老,老娘风韵犹存;来来来,喝完这杯,还有三杯");}// 还有数据要拿取,就再请求subscription.request(1);// 没有数据了,就可以不请求了// subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出异常了System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onError");}@Overridepublic void onComplete() {// 发布者关闭了System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onComplete");// 关闭发布者this.close();}}
处理器,实现了接口 Publisher 和 Subscriber,所以既可以发布,又可以订阅。代码如下:
小结
发布者的 submit 方法是一个阻塞方法,当缓存满了,就不会产生新的数据,当订阅者消费了再产生。这样就实现了保证一定水平运行。
