1. package jdk9;
    2. import java.util.concurrent.Flow.Processor;
    3. import java.util.concurrent.Flow.Subscriber;
    4. import java.util.concurrent.Flow.Subscription;
    5. import java.util.concurrent.SubmissionPublisher;
    6. /**
    7. * 带 process 的 flow demo
    8. */
    9. /**
    10. * Processor, 需要继承SubmissionPublisher并实现Processor接口
    11. *
    12. * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
    13. */
    14. class MyProcessor extends SubmissionPublisher<String>
    15. implements Processor<Integer, String> {
    16. private Subscription subscription;
    17. @Override
    18. public void onSubscribe(Subscription subscription) {
    19. // 保存订阅关系, 需要用它来给发布者响应
    20. this.subscription = subscription;
    21. // 请求一个数据
    22. this.subscription.request(1);
    23. }
    24. @Override
    25. public void onNext(Integer item) {
    26. // 接受到一个数据, 处理
    27. System.out.println("处理器接受到数据: " + item);
    28. // 过滤掉小于0的, 然后发布出去
    29. if (item > 0) {
    30. this.submit("转换后的数据:" + item);
    31. }
    32. // 处理完调用request再请求一个数据
    33. this.subscription.request(1);
    34. // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
    35. // this.subscription.cancel();
    36. }
    37. @Override
    38. public void onError(Throwable throwable) {
    39. // 出现了异常(例如处理数据的时候产生了异常)
    40. throwable.printStackTrace();
    41. // 我们可以告诉发布者, 后面不接受数据了
    42. this.subscription.cancel();
    43. }
    44. @Override
    45. public void onComplete() {
    46. // 全部数据处理完了(发布者关闭了)
    47. System.out.println("处理器处理完了!");
    48. // 关闭发布者
    49. this.close();
    50. }
    51. }
    52. public class FlowDemo2 {
    53. public static void main(String[] args) throws Exception {
    54. // 1. 定义发布者, 发布的数据类型是 Integer
    55. // 直接使用jdk自带的SubmissionPublisher
    56. SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
    57. // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
    58. MyProcessor processor = new MyProcessor();
    59. // 3. 发布者 和 处理器 建立订阅关系
    60. publiser.subscribe(processor);
    61. // 4. 定义最终订阅者, 消费 String 类型数据
    62. Subscriber<String> subscriber = new Subscriber<String>() {
    63. private Subscription subscription;
    64. @Override
    65. public void onSubscribe(Subscription subscription) {
    66. // 保存订阅关系, 需要用它来给发布者响应
    67. this.subscription = subscription;
    68. // 请求一个数据
    69. this.subscription.request(1);
    70. }
    71. @Override
    72. public void onNext(String item) {
    73. // 接受到一个数据, 处理
    74. System.out.println("接受到数据: " + item);
    75. // 处理完调用request再请求一个数据
    76. this.subscription.request(1);
    77. // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
    78. // this.subscription.cancel();
    79. }
    80. @Override
    81. public void onError(Throwable throwable) {
    82. // 出现了异常(例如处理数据的时候产生了异常)
    83. throwable.printStackTrace();
    84. // 我们可以告诉发布者, 后面不接受数据了
    85. this.subscription.cancel();
    86. }
    87. @Override
    88. public void onComplete() {
    89. // 全部数据处理完了(发布者关闭了)
    90. System.out.println("处理完了!");
    91. }
    92. };
    93. // 5. 处理器 和 最终订阅者 建立订阅关系
    94. processor.subscribe(subscriber);
    95. // 6. 生产数据, 并发布
    96. // 这里忽略数据生产过程
    97. publiser.submit(-111);
    98. publiser.submit(111);
    99. // 7. 结束后 关闭发布者
    100. // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
    101. publiser.close();
    102. // 主线程延迟停止, 否则数据没有消费就退出
    103. Thread.currentThread().join(1000);
    104. }
    105. }