jdk9 flow api(reactive stream)

publisher、subscription、processor
它与函数式编程是两回事,各是各的概念。反应式编程是基于发布订阅,存在回压(背压),来感知客户端、服务端的压力(订阅者与发布者的互动)。
flow 是jdk9 对 reactive stream 的官方实现,spring5中webflux是另一种实现。
jdk9 中flow api:
image.png

demo,测试发布订阅

  1. package com.demo;
  2. import java.util.concurrent.Flow.Subscriber;
  3. import java.util.concurrent.Flow.Subscription;
  4. import java.util.concurrent.SubmissionPublisher;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * 测试jdk9 flow (reactive
  8. */
  9. public class TestJdkFlow {
  10. public static void main(String[] args) throws InterruptedException {
  11. // 1. 定义发布者
  12. // 直接使用jdk自带的 submissionPublisher,它实现了 publisher 接口
  13. SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
  14. // 2. 定义发布者
  15. Subscriber<String> subscriber = new Subscriber<>() {
  16. private Subscription subscription;
  17. @Override
  18. public void onSubscribe(Subscription subscription) {
  19. System.out.println(">>>>>>>>>>> onSubscribe");
  20. // 保存订阅关系,需要使用它来给发布者响应
  21. this.subscription = subscription;
  22. // 请求一个数据
  23. subscription.request(1);
  24. }
  25. @Override
  26. public void onNext(String item) {
  27. // 接收到一个数据,处理
  28. System.out.println(">>>>>>>>>>> onNext:" + item);
  29. // 处理完事,告诉发布者我还要请求数据
  30. subscription.request(1);
  31. // 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了
  32. // subscription.cancel();
  33. }
  34. @Override
  35. public void onError(Throwable throwable) {
  36. // 出现异常,处理
  37. System.out.println(">>>>>>>>>>> onError");
  38. // 可以告诉发布者,自己不需要接收收据了
  39. subscription.cancel();
  40. }
  41. @Override
  42. public void onComplete() {
  43. // 全部数据处理完成了(发布者关闭了)
  44. System.out.println(">>>>>>>>>>> onComplete");
  45. }
  46. };
  47. // 3. 发布者与订阅者建立订阅关系
  48. publisher.subscribe(subscriber);
  49. // 4. 生产数据,并发布数据
  50. // 模拟数据
  51. String str = "小辉又犯傻了。。。。";
  52. publisher.submit(str);
  53. publisher.submit(str+"...");
  54. // 5. 结束后,关闭发布者
  55. publisher.close();
  56. // Thread.currentThread().join(100000);
  57. // 线程保留,等待执行完成
  58. TimeUnit.SECONDS.sleep(10);
  59. }
  60. }

demo,测试发布者、处理器、订阅者式的发布订阅

  1. package com.demo;
  2. import java.util.Objects;
  3. import java.util.concurrent.Flow.Processor;
  4. import java.util.concurrent.Flow.Subscriber;
  5. import java.util.concurrent.Flow.Subscription;
  6. import java.util.concurrent.SubmissionPublisher;
  7. import java.util.concurrent.TimeUnit;
  8. /**
  9. * 使用处理器,测试反应式编程:
  10. * 相当于 发布者->数据处理器->订阅者
  11. */
  12. public class TestJDKFlowProcessorDemo {
  13. public static void main(String[] args) throws InterruptedException {
  14. // 1. 发布者
  15. SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
  16. // 2. 定义处理器,对数据进行过滤,并转换为 String 类型
  17. TestJDKFlowProcessor processor = new TestJDKFlowProcessor();
  18. // 3. 发布者和处理器建立订阅关系
  19. publisher.subscribe(processor);
  20. // 4. 定义最终的订阅者,消费String数据
  21. Subscriber<String> subscriber = new Subscriber<>() {
  22. private Subscription subscription;
  23. @Override
  24. public void onSubscribe(Subscription subscription) {
  25. System.out.println(">>>>>>>>>>>subscriber onSubscribe");
  26. // 保存订阅关系,需要使用它来给发布者响应
  27. this.subscription = subscription;
  28. // 请求一个数据
  29. subscription.request(1);
  30. }
  31. @Override
  32. public void onNext(String item) {
  33. // 接收到一个数据,处理
  34. System.out.println(">>>>>>>>>>>subscriber onNext:" + item);
  35. // 处理完事,告诉发布者我还要请求数据
  36. subscription.request(1);
  37. // 如果已经拿到需要的数据,可以调用cancel告诉publisher发布者,自己不再需要数据了
  38. // subscription.cancel();
  39. }
  40. @Override
  41. public void onError(Throwable throwable) {
  42. // 出现异常,处理
  43. System.out.println(">>>>>>>>>>>subscriber onError");
  44. // 可以告诉发布者,自己不需要接收收据了
  45. subscription.cancel();
  46. }
  47. @Override
  48. public void onComplete() {
  49. // 全部数据处理完成了(发布者关闭了)
  50. System.out.println(">>>>>>>>>>>subscriber onComplete");
  51. }
  52. };
  53. // 5. 处理器和最终订阅者建立订阅关系
  54. processor.subscribe(subscriber);
  55. // 6. 生产数据,并发布
  56. publisher.submit(30);
  57. publisher.submit(32);
  58. publisher.submit(35);
  59. publisher.submit(null); // java.lang.NullPointerException,不可发布空对象数据
  60. // 7. 关闭发布者
  61. publisher.close();
  62. TimeUnit.SECONDS.sleep(10);
  63. }
  64. }
  65. /**
  66. * Processor 需要继承SubmissionPublisher,并实现 Processor
  67. * 数据处理(相当于一个数据中转站)
  68. *
  69. * 输入源数据 Integer,过滤掉小于0的,然后转换为String发布出去
  70. */
  71. class TestJDKFlowProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> {
  72. private Subscription subscription;
  73. @Override
  74. public void onSubscribe(Subscription subscription) {
  75. System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onSubscribe");
  76. // 保存订阅关系,需要使用它来给发布者响应
  77. this.subscription = subscription;
  78. // 向发布者请求一个资源
  79. subscription.request(1);
  80. }
  81. @Override
  82. public void onNext(Integer item) {
  83. System.out.println(">>>>>>>>>>>>>>>>>>TestJDKFlowProcessor onNext");
  84. // 收到一个数据
  85. System.out.println(">>>>>>>> 我收到了:" + item);
  86. // 处理数据
  87. if (Objects.isNull(item)) {
  88. this.submit("你都不告诉我,你的年纪,是年纪一大把了吗");
  89. } else if (item < 18) {
  90. this.submit("小女子芳年二八,待字闺中人未识哦");
  91. } else {
  92. this.submit("英雄宝刀未老,老娘风韵犹存;来来来,喝完这杯,还有三杯");
  93. }
  94. // 还有数据要拿取,就再请求
  95. subscription.request(1);
  96. // 没有数据了,就可以不请求了
  97. // subscription.cancel();
  98. }
  99. @Override
  100. public void onError(Throwable throwable) {
  101. // 出异常了
  102. System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onError");
  103. }
  104. @Override
  105. public void onComplete() {
  106. // 发布者关闭了
  107. System.out.println(">>>>>>>>>>>TestJDKFlowProcessor onComplete");
  108. // 关闭发布者
  109. this.close();
  110. }
  111. }

处理器,实现了接口 Publisher 和 Subscriber,所以既可以发布,又可以订阅。代码如下:
image.png

小结

发布者的 submit 方法是一个阻塞方法,当缓存满了,就不会产生新的数据,当订阅者消费了再产生。这样就实现了保证一定水平运行。