1. package jdk9;
    2. import java.util.concurrent.Flow.Subscriber;
    3. import java.util.concurrent.Flow.Subscription;
    4. import java.util.concurrent.SubmissionPublisher;
    5. import java.util.concurrent.TimeUnit;
    6. public class FlowDemo {
    7. public static void main(String[] args) throws Exception {
    8. // 1. 定义发布者, 发布的数据类型是 Integer
    9. // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
    10. SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
    11. // 2. 定义订阅者
    12. Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    13. private Subscription subscription;
    14. @Override
    15. public void onSubscribe(Subscription subscription) {
    16. // 保存订阅关系, 需要用它来给发布者响应
    17. this.subscription = subscription;
    18. // 请求一个数据
    19. this.subscription.request(1);
    20. }
    21. @Override
    22. public void onNext(Integer item) {
    23. // 接受到一个数据, 处理
    24. System.out.println("接受到数据: " + item);
    25. try {
    26. TimeUnit.SECONDS.sleep(3);
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. // 处理完调用request再请求一个数据
    31. this.subscription.request(1);
    32. // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
    33. // this.subscription.cancel();
    34. }
    35. @Override
    36. public void onError(Throwable throwable) {
    37. // 出现了异常(例如处理数据的时候产生了异常)
    38. throwable.printStackTrace();
    39. // 我们可以告诉发布者, 后面不接受数据了
    40. this.subscription.cancel();
    41. }
    42. @Override
    43. public void onComplete() {
    44. // 全部数据处理完了(发布者关闭了)
    45. System.out.println("处理完了!");
    46. }
    47. };
    48. // 3. 发布者和订阅者 建立订阅关系
    49. publiser.subscribe(subscriber);
    50. // 4. 生产数据, 并发布
    51. // 这里忽略数据生产过程
    52. for (int i = 0; i < 1000; i++) {
    53. System.out.println("生成数据:" + i);
    54. // submit是个block方法
    55. publiser.submit(i);
    56. }
    57. // 5. 结束后 关闭发布者
    58. // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
    59. publiser.close();
    60. // 主线程延迟停止, 否则数据没有消费就退出
    61. Thread.currentThread().join(1000);
    62. // debug的时候, 下面这行需要有断点
    63. // 否则主线程结束无法debug
    64. System.out.println();
    65. }
    66. }