jdk9 flow api(reactive stream)
publisher、subscription、processor
它与函数式编程是两回事,各是各的概念。反应式编程是基于发布订阅,存在回压(背压),来感知客户端、服务端的压力(订阅者与发布者的互动)。
flow 是jdk9 对 reactive stream 的官方实现,spring5中webflux是另一种实现。
jdk9 中flow api:
demo,测试发布订阅
package com.demo;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
/**
* 测试jdk9 flow (reactive
*/
public class TestJdkFlow {
public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者
// 直接使用jdk自带的 submissionPublisher,它实现了 publisher 接口
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 2. 定义发布者
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(">>>>>>>>>>> onSubscribe");
// 保存订阅关系,需要使用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收到一个数据,处理
System.out.println(">>>>>>>>>>> onNext:" + item);
// 处理完事,告诉发布者我还要请求数据
subscription.request(1);
// 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了
// subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现异常,处理
System.out.println(">>>>>>>>>>> onError");
// 可以告诉发布者,自己不需要接收收据了
subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完成了(发布者关闭了)
System.out.println(">>>>>>>>>>> onComplete");
}
};
// 3. 发布者与订阅者建立订阅关系
publisher.subscribe(subscriber);
// 4. 生产数据,并发布数据
// 模拟数据
String str = "小辉又犯傻了。。。。";
publisher.submit(str);
publisher.submit(str+"...");
// 5. 结束后,关闭发布者
publisher.close();
// Thread.currentThread().join(100000);
// 线程保留,等待执行完成
TimeUnit.SECONDS.sleep(10);
}
}
demo,测试发布者、处理器、订阅者式的发布订阅
package com.demo;
import java.util.Objects;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
/**
* 使用处理器,测试反应式编程:
* 相当于 发布者->数据处理器->订阅者
*/
public class TestJDKFlowProcessorDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 发布者
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
// 2. 定义处理器,对数据进行过滤,并转换为 String 类型
TestJDKFlowProcessor processor = new TestJDKFlowProcessor();
// 3. 发布者和处理器建立订阅关系
publisher.subscribe(processor);
// 4. 定义最终的订阅者,消费String数据
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(">>>>>>>>>>>subscriber onSubscribe");
// 保存订阅关系,需要使用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收到一个数据,处理
System.out.println(">>>>>>>>>>>subscriber onNext:" + item);
// 处理完事,告诉发布者我还要请求数据
subscription.request(1);
// 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了
// subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现异常,处理
System.out.println(">>>>>>>>>>>subscriber onError");
// 可以告诉发布者,自己不需要接收收据了
subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完成了(发布者关闭了)
System.out.println(">>>>>>>>>>>subscriber onComplete");
}
};
// 5. 处理器和最终订阅者建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据,并发布
publisher.submit(30);
publisher.submit(32);
publisher.submit(35);
publisher.submit(null); // java.lang.NullPointerException,不可发布空对象数据
// 7. 关闭发布者
publisher.close();
TimeUnit.SECONDS.sleep(10);
}
}
/**
* Processor 需要继承SubmissionPublisher,并实现 Processor
* 数据处理(相当于一个数据中转站)
*
* 输入源数据 Integer,过滤掉小于0的,然后转换为String发布出去
*/
class TestJDKFlowProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onSubscribe");
// 保存订阅关系,需要使用它来给发布者响应
this.subscription = subscription;
// 向发布者请求一个资源
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(">>>>>>>>>>>>>>>>>>TestJDKFlowProcessor onNext");
// 收到一个数据
System.out.println(">>>>>>>> 我收到了:" + item);
// 处理数据
if (Objects.isNull(item)) {
this.submit("你都不告诉我,你的年纪,是年纪一大把了吗");
} else if (item < 18) {
this.submit("小女子芳年二八,待字闺中人未识哦");
} else {
this.submit("英雄宝刀未老,老娘风韵犹存;来来来,喝完这杯,还有三杯");
}
// 还有数据要拿取,就再请求
subscription.request(1);
// 没有数据了,就可以不请求了
// subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出异常了
System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onError");
}
@Override
public void onComplete() {
// 发布者关闭了
System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onComplete");
// 关闭发布者
this.close();
}
}
处理器,实现了接口 Publisher 和 Subscriber,所以既可以发布,又可以订阅。代码如下:
小结
发布者的 submit 方法是一个阻塞方法,当缓存满了,就不会产生新的数据,当订阅者消费了再产生。这样就实现了保证一定水平运行。