layout: post title: Operator 并发原语: producers(五),ProducerArbiter tags:

  1. - Operator
  2. - Producer

原文 Operator concurrency primitives: producers (part 5){:target=”_blank”}

介绍

在编写操作符的时候,同时处理多个数据源以及 backpressure 并非易事。即便简单如在第一个 Observable 发射结束之后继续发射第二个 Observable 的数据这一功能,要正确处理好请求处理和传播,也是存在很大挑战的。

通常在 RxJava 中,尤其是 reactive-streams{:target=”_blank”},我们必须且只能为 Subscriber 设置一次 Producer。不要以为 Subscriber.setProducer() 具备线程安全性,它就能被随时改变,因为 Subscriber 不负责(我认为也不应该负责)处理请求,它只负责简单地记录请求数目,并在 Producer 被设置时把请求计数转发给 producer。如果你请求了很多数据,并且中途替换了 producer,那新的 producer 将会收到在它被设置时 subscriber 的成员 requested 记录的总请求量,而不是老的 producer 仍未处理的请求量。

在本文中,我将介绍一种能够帮助我们应对这一挑战的 producer。

producer-arbiter

假设我们需要编写这样一个操作符(注意我们并不是要 concatWith()):给定两个 Observable,它将观察第一个 Observable,当第一个 Observable 结束时它将开始观察第二个 Observable,当第二个 Observable 结束时,它才会结束。

  1. public final class ThenObserve<T> implements Operator<T, T> {
  2. final Observable<? extends T> other;
  3. public ThenObserve(Observable<? extends T> other) {
  4. this.other = other;
  5. }
  6. @Override
  7. public Subscriber<? super T> call(Subscriber<? super T> child) {
  8. Subscriber<T> parent = new Subscriber<T>(child, false) {
  9. @Override
  10. public void onNext(T t) {
  11. child.onNext(t);
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. child.onError(e);
  16. }
  17. @Override
  18. public void onCompleted() {
  19. other.unsafeSubscribe(child);
  20. }
  21. };
  22. child.add(parent);
  23. return parent;
  24. }
  25. }
  26. Observable<Integer> source = Observable
  27. .range(1, 10)
  28. .lift(new ThenObserve<>(Observable.range(11, 90)));
  29. source.subscribe(System.out::println);
  30. System.out.println("---");
  31. TestSubscriber<Integer> ts = new TestSubscriber<>();
  32. ts.requestMore(20);
  33. source.subscribe(ts);
  34. ts.getOnNextEvents().forEach(System.out::println);

如果你运行上面的代码,第一次订阅 source 将会打印正确的内容:从 1 到 100。但第二次订阅,尽管我们只请求了 20 个,但它却打印了从 1 到 30!显然,第二个 range observable 并不是只发出剩下的 10 个数据,而是发出了 20 个数据。

因此我们需要记录下游请求的数据量,以及上游发出的数据量,一旦数据源发生变化,我们要让新数据源只发出剩下的数据量。为了实现这一功能,我们需要一个能够线程安全地管理请求和数据源切换的 producer。

让我们称之为 ProducerArbiter,其基本结构如下:

  1. public final class ProducerArbiter
  2. implements Producer {
  3. long requested; // (1)
  4. Producer currentProducer;
  5. boolean emitting; // (2)
  6. long missedRequested;
  7. long missedProduced;
  8. Producer missedProducer;
  9. static final Producer NULL_PRODUCER = n -> { }; // (3)
  10. @Override
  11. public void request(long n) {
  12. // to implement
  13. }
  14. public void produced(long n) {
  15. // to implement
  16. }
  17. public void set(Producer newProducer) {
  18. // to implement
  19. }
  20. public void emitLoop() {
  21. // to implement
  22. }
  23. }

ProducerArbiter 看起来并无特殊之处,目前看来,它有以下几个特性:

  1. 我们记录到目前为止总的请求数量,也记录当前的 producer。当前的 producer 可能为 null,这种情况下我们依然累计总请求数量,并在 producer 被设置时,一起发送给 producer。
  2. 我们将使用 发射者循环(emitter-loop){:target=”blank”}方式来实现串行访问,因为我们需要支持并发调用 request(),以及并发切换 producer。注意,这并不是保证从上游到下游的 onNext 事件串行发生,并发切换 producer 可能会导致来自多个上游 observable 的 onNext 事件同时发生。我将在本系列后续的文章中解决这一问题,不过幸运的是,在本文的 ThenObserve 中,这个问题并没有影响。_
  3. 有时我们需要清除对当前 producer 的引用,但我们用 missedProducernull 来代表当前没有设置 producer 的尝试。

有了这个基本结构之后,让我们逐个实现上面的函数:

  1. // ... same as before
  2. @Override
  3. public void request(long n) {
  4. if (n < 0) { // (1)
  5. throw new IllegalArgumentException();
  6. }
  7. if (n == 0) {
  8. return;
  9. }
  10. synchronized (this) {
  11. if (emitting) {
  12. missedRequested += n; // (2)
  13. return;
  14. }
  15. emitting = true;
  16. }
  17. boolean skipFinal = false;
  18. try {
  19. long r = requested;
  20. long u = r + n;
  21. if (u < 0) {
  22. u = Long.MAX_VALUE;
  23. }
  24. requested = u; // (3)
  25. Producer p = currentProducer;
  26. if (p != null) {
  27. p.request(n); // (4)
  28. }
  29. emitLoop(); // (5)
  30. skipFinal = true;
  31. } finally {
  32. if (!skipFinal) { // (6)
  33. synchronized (this) {
  34. emitting = false;
  35. }
  36. }
  37. }
  38. }

别怕,这个函数不是本文中最长的函数:

  1. 我们进行普通的请求数量检查。
  2. 我们把错过的请求(还未来得及处理的请求)单独记录在 missedRequested 中(而非记录在 missedProduced),因为如果把它们记录在一起,我们就无法区分被允许的请求“溢出”(当成是请求 Long.MAX_VALUE)和过度生产了。
  3. 我们也和其他 producer 一样更新 requested,并将其限制在 Long.MAX_VALUE 中。
  4. 如果当前已经设置了 producer,我们就请求 n 个(注意并不是 requested 个!),因为请求是在每个 producer 上进行累计的。
  5. request 可能被并发调用,所以我们进入发射者循环的循环阶段中。
  6. 如果 emitLoop() 正常返回,我们跳过 finally 语句块,否则我们需要清除 emitting 标记,使得后续的调用可以继续利用 ProducerArbiter(可能是不同的 producer,可以参见 Observable.retry())。

接下来是 produced 函数:

  1. // ... continued
  2. public void produced(long n) {
  3. if (n <= 0) { // (1)
  4. throw new IllegalArgumentException();
  5. }
  6. synchronized (this) {
  7. if (emitting) {
  8. missedProduced += n; // (2)
  9. return;
  10. }
  11. emitting = true;
  12. }
  13. boolean skipFinal = false;
  14. try {
  15. long r = requested;
  16. long u = r - n;
  17. if (u < 0) {
  18. throw new IllegalStateException(); // (3)
  19. }
  20. requested = u;
  21. emitLoop(); // (4)
  22. skipFinal = true;
  23. } finally {
  24. if (!skipFinal) {
  25. synchronized (this) {
  26. emitting = false;
  27. }
  28. }
  29. }
  30. }

结构和 request 比较类似:

  1. 如果已生产的数量非正,则说明出了 bug。
  2. 我们单独记录错过的已生产数量(请看 request() 中的解释)。
  3. 我们从 requested 中减去 n,并检查下溢。下溢意味着 bug 或者上游没有支持 backpressure。
  4. 我们尝试处理所有错过的事件。
  1. // ... continued
  2. public void set(Producer newProducer) {
  3. synchronized (this) {
  4. if (emitting) {
  5. missedProducer = newProducer == null ?
  6. NULL_PRODUCER : newProducer; // (1)
  7. return;
  8. }
  9. emitting = true;
  10. }
  11. boolean skipFinal = false;
  12. try {
  13. currentProducer = newProducer;
  14. if (newProducer != null) {
  15. newProducer.request(requested); // (2)
  16. }
  17. emitLoop(); // (3)
  18. skipFinal = true;
  19. } finally {
  20. if (!skipFinal) {
  21. synchronized (this) {
  22. emitting = false;
  23. }
  24. }
  25. }
  26. }

代码结构和行为也是很相似的:

  1. 当前的 producer 要么被更新,要么被清除。我们直接覆盖了之前的错过的 producer,因为反正它们也没有机会可以发出数据。
  2. 如果我们获得了新的 producer,我们直接请求 requested 个数据。这保证了新的 producer 将生产之前的 producer 未生产完毕的数量。注意,在 onNext 事件的同时切换 producer 可能造成多余请求:新的 producer 会收到 n 的请求,但老的 producer 可能会在停止之前发出一个数据,这样就多了一个数据,而且有可能导致 MissingBackpressureException。正如前文所述,在本文的情况下,这一问题不会发生,后面我将单独成文解决这一问题。
  3. 剩下的代码就和其他部分一样了:进入循环,处理错过的事件,并处理好可能的异常。

最后就是 emitLoop() 函数了:

  1. // ... continued
  2. public void emitLoop() {
  3. for (;;) {
  4. long localRequested;
  5. long localProduced;
  6. Producer localProducer;
  7. synchronized (this) {
  8. localRequested = missedRequested;
  9. localProduced = missedProduced;
  10. localProducer = missedProducer;
  11. if (localRequested == 0L
  12. && localProduced == 0L
  13. && localProducer == null) { // (1)
  14. emitting = false;
  15. return;
  16. }
  17. missedRequested = 0L;
  18. missedProduced = 0L;
  19. missedProducer = null; // (2)
  20. }
  21. long r = requested;
  22. if (r != Long.MAX_VALUE) { // (3)
  23. long u = r + localRequested;
  24. if (u < 0 || u == Long.MAX_VALUE) { // (4)
  25. r = Long.MAX_VALUE;
  26. requested = r;
  27. } else {
  28. long v = u - localProduced; // (5)
  29. if (v < 0) {
  30. throw new IllegalStateException();
  31. }
  32. r = v;
  33. requested = v;
  34. }
  35. }
  36. if (localProducer != null) { // (6)
  37. if (localProducer == NULL_PRODUCER) {
  38. currentProducer = null;
  39. } else {
  40. currentProducer = localProducer;
  41. localProducer.request(r); // (7)
  42. }
  43. } else {
  44. Producer p = currentProducer;
  45. if (p != null && localRequested != 0L) {
  46. p.request(localRequested); // (8)
  47. }
  48. }
  49. }
  50. }

emitLoop() 中我们需要处理多种状态和可能:

  1. 退出循环的条件是没有任何待处理的请求、已生产数量更新以及 producer 切换。
  2. 如果我们还需要继续循环,我们首先清除各个标记成员。
  3. 如果当前请求的数量是 Long.MAX_VALUE,那我们处理请求时就可以简化了,因为下游处于无限请求模式。
  4. 即便当前请求的数量不是无限,加上待处理的请求之后也可能是无限,这种情况下也可以简化处理。
  5. 如果仍不是无限请求模式,那我们就需要减去已生产的数量,更新计数的局部变量以及成员变量。
  6. 如果需要切换 producer,我们就更新 currentProducer(或者清除)。
  7. 如果新的 producer 没有被清除,我们就更新 currentProducer,并一次性请求截至当前所需要请求的数量。
  8. 如果不切换 producer,那我们只需要请求新增的待处理请求(如果 producer 不为 null 的话)。

实现了完整的 ProducerArbiter 之后,我们就可以修改我们的 ThenObserve 使之能正常工作了:

  1. public final class ThenObserve<T> implements Operator<T, T> {
  2. final Observable<? extends T> other;
  3. public ThenObserve(Observable<? extends T> other) {
  4. this.other = other;
  5. }
  6. @Override
  7. public Subscriber<? super T> call(
  8. Subscriber<? super T> child) {
  9. ProducerArbiter pa = new ProducerArbiter(); // (1)
  10. Subscriber<T> parent = new Subscriber<T>() {
  11. @Override
  12. public void onNext(T t) {
  13. child.onNext(t);
  14. pa.produced(1); // (2)
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. pa.set(null); // (3)
  19. child.onError(e);
  20. }
  21. @Override
  22. public void onCompleted() {
  23. pa.set(null); // (4)
  24. Subscriber<T> parent2 = create2(pa, child); // (5)
  25. child.add(parent2);
  26. other.unsafeSubscribe(parent2); // (6)
  27. }
  28. @Override
  29. public void setProducer(Producer producer) {
  30. pa.set(producer); // (7)
  31. }
  32. };
  33. child.add(parent);
  34. child.setProducer(pa);
  35. return parent;
  36. }
  37. Subscriber<T> create2(ProducerArbiter pa,
  38. Subscriber<? super T> child) { // (8)
  39. return new Subscriber<T>() {
  40. @Override
  41. public void onNext(T t) {
  42. child.onNext(t);
  43. pa.produced(1);
  44. }
  45. @Override
  46. public void onError(Throwable e) {
  47. pa.set(null);
  48. child.onError(e);
  49. }
  50. @Override
  51. public void onCompleted() {
  52. pa.set(null);
  53. child.onCompleted();
  54. }
  55. @Override
  56. public void setProducer(Producer producer) {
  57. pa.set(producer);
  58. }
  59. };
  60. }
  61. }

这次扩展看起来不太优雅,怎么回事?

  1. 我们创建了一个 ProducerArbiter 实例。
  2. onNext 中,我们通知 ProducerArbiter 已经生产了一个数据。
  3. onError 中,我们释放掉 producer,以避免内存泄漏。
  4. 同样,在上游 onCompleted 事件到来时,我们也清除掉上游的 producer。
  5. Subscriber 不应该通常也不能被复用,所以我们需要为第二个数据源创建一个新的 Subscriber。而且如果复用,将会导致无限的重订阅,最终导致 StackOverflowError
  6. 把新的 Subscriber 加入到 child 中,这样就能正确取消订阅了。我们订阅第二个 Observable,并且结束第一个 subscriber。
  7. 我们捕获上游可能的 producer,并将其用在 ProducerArbiter 中。
  8. 为了方便,我把创建第二个 Subscriber 的代码移到了一个辅助函数中。它的结构看起来和第一个 Subscriber 很类似,除了 onCompleted 稍有不同,我们结束 child,而不是再订阅其他的 observable。

总结

利用像 ProducerArbiter 这样的 producer,我们就可以动态切换 producer了,并且能保证请求数量的正确处理,因此从多个上游数据源的 producer 中获取正确的数据了。

然而,正如我在具体的解释中警告的那样,这个 producer-arbiter 的运行和 onXXX 事件是独立的,如果事件的发射没有保证串行发生,那就有可能出现不可预测的行为。但在本文的例子中,不会发生这样的问题,因为我们知道当我们在第一个 Subscriber.onCompleted() 中切换 producer 时,第一个 observable 不会再发出任何 onNextonError 事件了,所以我们可以安全切换。

然而,在绝大多数诸如 switchOnNext() 高级操作符中,都存在并发的请求分发,所以如果想要它们能正常工作,我们需要一个更加高级的 producer-arbiter:producer-observer-arbiter。我将在本系列的下一篇文章中进行介绍。