layout: post title: Reactive-Streams API(二):SingleSubscription,SingleDelayedSubscription 和 RangeSubscription tags:

  1. - Reactive Stream

原文 The Reactive-Streams API (part 2){:target=”_blank”}

介绍

在本文中,我将把我们以前的 SingleProducerSingleDelayedProducer 移植到基于 reactive-streams 的 Subscription

首先,很多人可能认为这个转换过程很麻烦,但幸运的是,如果我们已经想清楚了在 rx.Producer 中如何实现 request(),那我们基本上就已经完成了 75% 了。剩下的 25% 来自于我们要把 rx.Subscriber.isUnsubscribed() 中的逻辑转移到 request() 中,因为 org.reactivestreams.Subscriber 中没有 isUnsubscribed()(其他资源管理类的接口都没有这个方法了)。

SingleSubscription

由于 SingleSubscription 本身并不复杂,这里我就直接一步到位:

  1. import org.reactivestreams.*;
  2. public final class SingleSubscription<T>
  3. extends AtomicBoolean implements Subscription {
  4. private static final long serialVersionUID = 1L;
  5. final T value; // (1)
  6. final Subscriber<? super T> child;
  7. volatile boolean cancelled; // (2)
  8. public SingleSubscription(T value,
  9. Subscriber<? super T> child) { // (3)
  10. this.value = Objects.requireNonNull(value);
  11. this.child = Objects.requireNonNull(child);
  12. }
  13. @Override
  14. public void request(long n) {
  15. if (n <= 0) {
  16. throw new IllegalArgumentException(
  17. "n > 0 required"); // (4)
  18. }
  19. if (compareAndSet(false, true)) {
  20. if (!cancelled) { // (5)
  21. child.onNext(value);
  22. if (!cancelled) {
  23. child.onComplete();
  24. }
  25. }
  26. }
  27. }
  28. @Override
  29. public void cancel() {
  30. cancelled = true; // (6)
  31. }
  32. }

就是这样!这里我向大家展示了把 Producer 的实现迁移到 reactive-streams Subscription 并不需要太多的工作。但这里还是有几点值得一提:

  1. 我们需要在成员变量中保存将要发出的值,以及下游 subscriber。
  2. 然而由于 RS 中没有了 isUnsubscribed(),而且取消订阅也变成了 cancel(),所以我们需要在一个 volatile 变量中记录是否已经取消订阅。如果你还记得的话,我说过我们无法预知 request()cancel() 的调用情况,所以我们需要保证它们的线程安全性。
  3. 由于 RS 不允许 null,我们在构造函数中就检查错误。
  4. 我的“Let them throw!”哲学告诉我们非正请求数量是编程错误,我们应该抛出 IllegalArgumentException
  5. 由于没有 child.isUnsubscribed() 函数了,我们只能检查我们的 cancelled 变量。
  6. 为了保证取消的幂等性,我们只是安全的更改 cancelled 变量。

SingleDelayedSubscription

SingleSubscription 都这么简单了,SingleDelayedSubscription 又能复杂到哪里去呢?

  1. public final class SingleDelayedSubscription<T>
  2. extends AtomicInteger implements Subscription {
  3. private static final long serialVersionUID = -1L;
  4. T value;
  5. final Subscriber<? super T> child;
  6. static final int CANCELLED = -1; // (1)
  7. static final int NO_VALUE_NO_REQUEST = 0;
  8. static final int NO_VALUE_HAS_REQUEST = 1;
  9. static final int HAS_VALUE_NO_REQUEST = 2;
  10. static final int HAS_VALUE_HAS_REQUEST = 3;
  11. public SingleDelayedSubscription(Subscriber<? super T> child) {
  12. this.child = Objects.requireNonNull(child);
  13. }
  14. @Override
  15. public void request(long n) {
  16. if (n <= 0) {
  17. throw new IllegalArgumentException("n > 0 required");
  18. }
  19. for (;;) {
  20. int s = get();
  21. if (s == NO_VALUE_HAS_REQUEST
  22. || s == HAS_VALUE_HAS_REQUEST
  23. || s == CANCELLED) { // (2)
  24. return;
  25. } else if (s == NO_VALUE_NO_REQUEST) {
  26. if (!compareAndSet(s, NO_VALUE_HAS_REQUEST)) {
  27. continue;
  28. }
  29. } else if (s == HAS_VALUE_NO_REQUEST) {
  30. if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
  31. T v = value;
  32. value = null;
  33. child.onNext(v);
  34. if (get() != CANCELLED) { // (3)
  35. child.onComplete();
  36. }
  37. }
  38. }
  39. return;
  40. }
  41. }
  42. public void setValue(T value) {
  43. Objects.requireNonNull(value);
  44. for (;;) {
  45. int s = get();
  46. if (s == HAS_VALUE_NO_REQUEST
  47. || s == HAS_VALUE_HAS_REQUEST
  48. || s == CANCELLED) { // (4)
  49. return;
  50. } else if (s == NO_VALUE_NO_REQUEST) {
  51. this.value = value;
  52. if (!compareAndSet(s, HAS_VALUE_NO_REQUEST)) {
  53. continue;
  54. }
  55. } else if (s == NO_VALUE_HAS_REQUEST) {
  56. if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
  57. child.onNext(value);
  58. if (get() != CANCELLED) { // (5)
  59. child.onComplete();
  60. }
  61. }
  62. }
  63. return;
  64. }
  65. }
  66. @Override
  67. public void cancel() {
  68. int state = get();
  69. if (state != CANCELLED) { // (6)
  70. state = getAndSet(CANCELLED);
  71. if (state != CANCELLED) {
  72. value = null;
  73. }
  74. }
  75. }
  76. }

看起来和原来的状态机非常类似,但是多了一个 CANCELLED 状态,我们无需在 onNext 之前检查状态不为 CANCELLED,因为 CAS 操作隐含了这一条件,但我们应该在 onComplete() 之前进行检查。

为什么我们不使用一个 volatile boolean 来记录是否取消呢?其实完全可以。这种选择仅仅是出于个人偏好:增加一个成员变量,或者是扩展一个新状态。我主要是想要展示一下后者怎么实现。

RangeSubscription

我并不打算在这里把以前所有的 Producer 都改写为 Subscription,但我这里还想展示一个包括取消状态的状态机例子:

  1. public final class RangeSubscription
  2. extends AtomicLong implements Subscription {
  3. private static final long serialVersionUID = 1L;
  4. final Subscriber<? super Integer> child;
  5. int index;
  6. final int max;
  7. static final long CANCELLED = Long.MIN_VALUE; // (1)
  8. public RangeSubscription(
  9. Subscriber<? super Integer> child,
  10. int start, int count) {
  11. this.child = Objects.requireNonNull(child);
  12. this.index = start;
  13. this.max = start + count;
  14. }
  15. @Override
  16. public void request(long n) {
  17. if (n <= 0) {
  18. throw new IllegalArgumentException(
  19. "n > required");
  20. }
  21. long r;
  22. for (;;) {
  23. r = get();
  24. if (r == CANCELLED) { // (2)
  25. return;
  26. }
  27. long u = r + n;
  28. if (u < 0) {
  29. u = Long.MAX_VALUE;
  30. }
  31. if (compareAndSet(r, u)) {
  32. break;
  33. }
  34. }
  35. if (r != 0L) { // (p1)
  36. return;
  37. }
  38. for (;;) {
  39. r = get();
  40. if (r == CANCELLED) { // (3)
  41. return;
  42. }
  43. int i = index;
  44. int m = max;
  45. long e = 0;
  46. while (r > 0L && i < m) { // (p2)
  47. child.onNext(i);
  48. if (get() == CANCELLED) { // (4)
  49. return;
  50. }
  51. i++;
  52. if (i == m) {
  53. child.onComplete();
  54. return;
  55. }
  56. r--;
  57. e++;
  58. }
  59. index = i;
  60. if (e != 0) {
  61. for (;;) {
  62. r = get();
  63. if (r == CANCELLED) { // (5)
  64. return;
  65. }
  66. long u = r - e;
  67. if (u < 0) {
  68. throw new IllegalStateException(
  69. "more produced than requested!");
  70. }
  71. if (compareAndSet(r, u)) {
  72. break;
  73. }
  74. }
  75. }
  76. if (r <= 0L) { // (p3)
  77. break;
  78. }
  79. }
  80. }
  81. @Override
  82. public void cancel() {
  83. if (get() != CANCELLED) { // (6)
  84. getAndSet(CANCELLED);
  85. }
  86. }
  87. }

为了简洁起见,我省略了快速路径的逻辑。剩下的部分和原来的 RangeProducer 类似,但是取消状态被合并进了计数状态中,我们几乎需要在所有的地方(1~5)重新读出计数并和 CANCELLED 对比。注意发射计数再也不能用 getAndAdd() 了,直接递增可能会覆盖掉 CANCELLED 状态。最后在取消时使用 getAndSet() 可以保证幂等性。

译者注:这一段代码还是很复杂的,即便我之前翻译过 RangeProducer 的实现,看起来依然需要一些思考,所以这里进行一些分析(对应于上面的 p 标号):

  1. 当我们成功更新请求计数之后,只有从 0 开始增加请求计数的线程可以进入后面的发射循环。发射过程会递减请求计数,当请求处理完毕之后(请求计数重新变为 0),下次的请求调用才有可能进入发射循环。
  2. 这里发射循环有两个条件,一是有未处理的请求(r > 0),而是发射数据没有超出范围(i < m),这两者很可能是不同的。此外,如果发射数据已经超出了范围,而且请求计数也递减为 0 了,那后续的请求仍然能通过(p1)的检查,但不会进入发射循环,因为通不过 i < m 的检查。
  3. 这里是请求处理完毕,但发射数据没有超出范围的退出路径。

总结

在本文中,我展示了两种把 rx.Producer 改写为 RS Subscription 的方式(boolean 成员或者新状态),它们都能保证取消行为的正确性。对它们进行取舍需要进行权衡:boolean 成员会增加对象大小,新状态会增加算法复杂性。

下一篇文章中,我将介绍如何处理 RS 中缺失的另一个 rx.Subscriber 特性:用 add(rx.Subscriber) 把资源和下游 subscriber 结合起来。