layout: post title: Subjects(三,完结):支持 backpressure 的 PublishSubject tags:

  1. - Subject

原文 Subjects (part 3 - final){:target=”_blank”}

介绍

本文作为 Subject 系列最后一篇文章,将会实现一个 PublishSubject 的变体。为了让这个过程更有趣,这个 PublishSubject 会遵循 backpressure,当订阅者请求得没那么快时,不会将它“淹没”。

PublishSubject

PublishSubject 的主要类结构和上一篇文章中的 UnicastSubject 非常相似,所以我就直接跳过,重点关注在 State 类上。

State 的变化就会非常大了,因为 PublishSubject 允许同时存在多个 Subscriber,每个 Subscriber 都会有自己的 unsubscribedrequestedwip 状态。所以 State 不能直接实现 ProducerSubscription 了。我们会用另一个类实现它们:SubscriberState,每个 Subscriber 都会有一个 SubscriberState。

在讲 SubscriberState 的细节之前,还需要提一下 backpressure 处理策略。我们要支持用户指定 backpressure 策略,这样他们就不用再手动使用 onBackpressureXXX 了。为此,我们定义了 3 个 enum 值:

  1. enum BackpressureStrategy {
  2. DROP,
  3. BUFFER,
  4. ERROR
  5. }
  6. public static <T> PublishSubject<T> createWith(
  7. BackpressureStrategy strategy) {
  8. State<T> state = new State<>(strategy);
  9. return new PublishSubject<>(state);
  10. }

它们的名字就说明了功能:丢弃过多的数据,缓冲过多的数据,或者向 Subscriber 发出错误事件。

现在让我们看看 State 类的结构:

  1. static final class State<T>
  2. implements OnSubscribe<T>, Observer<T> {
  3. final BackpressureStrategy strategy;
  4. @SuppressWarnings("unchecked")
  5. volatile SubscriberState<T>[] subscribers = EMPTY;
  6. @SuppressWarnings("rawtypes")
  7. static final SubscriberState[] EMPTY = new SubscriberState[0];
  8. @SuppressWarnings("rawtypes")
  9. static final SubscriberState[] TERMINATED =
  10. new SubscriberState[0];
  11. volatile boolean done;
  12. Throwable error;
  13. public State(BackpressureStrategy strategy) {
  14. this.strategy = strategy;
  15. }
  16. boolean add(SubscriberState<T> subscriber) {
  17. // TODO Auto-generated method stub
  18. }
  19. void remove(SubscriberState<T> subscriber) {
  20. // TODO Auto-generated method stub
  21. }
  22. Subscriber<T>[] terminate() {
  23. // TODO Auto-generated method stub
  24. }
  25. @Override
  26. public void call(Subscriber<? super T> t) {
  27. // TODO Auto-generated method stub
  28. }
  29. @Override
  30. public void onNext(T t) {
  31. // TODO Auto-generated method stub
  32. }
  33. @Override
  34. public void onError(Throwable e) {
  35. // TODO Auto-generated method stub
  36. }
  37. @Override
  38. public void onCompleted() {
  39. // TODO Auto-generated method stub
  40. }
  41. }

现在还没有特殊的地方。我们用一个 volatile SubscriberState 数组保存所有订阅者的状态,addremoveterminate 方法进行操作。我们利用 EMPTY 常量,避免每次所有的 Subscriber 都取消之后都要分配一个新的空数组。这种方式看过此前 Subscription 容器相关文章{:target=”_blank”}的朋友应该会很熟悉。现在让我们看看 add() 的实现:

  1. boolean add(SubscriberState<T> subscriber) {
  2. synchronized (this) {
  3. SubscriberState<T>[] a = subscribers;
  4. if (a == TERMINATED) {
  5. return false;
  6. }
  7. int n = a.length;
  8. @SuppressWarnings("unchecked")
  9. SubscriberState<T>[] b = new SubscriberState[n + 1];
  10. System.arraycopy(a, 0, b, 0, n);
  11. b[n] = subscriber;
  12. subscribers = b;
  13. return true;
  14. }
  15. }

为了让我展现的实现方式多样化,这里我用了 synchronized 来进行同步,并没有使用 CAS 循环。上面的代码就是一个 copy-on-write 操作。这一实现方式的优点就是对数组进行遍历的时候会更快,而且基于一个经验事实,大部分 Subject 都不会同时有多个 Subscriber。但是如果我们真的遇见了会有大量 Subscriber 的场景,我们可以在同步代码块内使用基于 List 或者 Set 的容器。这里也有一个缺点,就是我们需要线程安全地对集合进行遍历,而这里唯一的线程安全方式就是进行一次深拷贝。

让我们接着看 remove() 的实现:

  1. @SuppressWarnings("unchecked")
  2. void remove(SubscriberState<T> subscriber) {
  3. synchronized (this) {
  4. SubscriberState<T>[] a = subscribers;
  5. if (a == TERMINATED || a == EMPTY) {
  6. return;
  7. }
  8. int n = a.length;
  9. int j = -1;
  10. for (int i = 0; i < n; i++) {
  11. if (a[i] == subscriber) {
  12. j = i;
  13. break;
  14. }
  15. }
  16. if (j < 0) {
  17. return;
  18. }
  19. SubscriberState<T>[] b;
  20. if (n == 1) {
  21. b = EMPTY;
  22. } else {
  23. b = new SubscriberState[n - 1];
  24. System.arraycopy(a, 0, b, 0, j);
  25. System.arraycopy(a, j + 1, b, j, n - j - 1);
  26. }
  27. subscribers = b;
  28. }
  29. }

同样也是 copy-on-write 的实现方式,也利用了 EMPTY 常量。

接下来我们看看 terminate()

  1. @SuppressWarnings("unchecked")
  2. SubscriberState<T>[] terminate() {
  3. synchronized (this) {
  4. SubscriberState<T>[] a = subscribers;
  5. if (a != TERMINATED) {
  6. subscribers = TERMINATED;
  7. }
  8. return a;
  9. }
  10. }

这里我们检查当前是否处于终结状态,如果不是,就把 subscribers 置为 TERMINATED,并且返回之前的值。

现在我们就可以实现 call() 了:

  1. @Override
  2. public void call(Subscriber<? super T> child) {
  3. SubscriberState<T> innerState =
  4. new SubscriberState<>(child, this); // (1)
  5. child.add(innerState); // (2)
  6. child.setProducer(innerState);
  7. if (add(innerState)) { // (3)
  8. if (strategy == BackpressureStrategy.BUFFER) { // (4)
  9. innerState.drain();
  10. } else if (innerState.unsubscribed) { // (5)
  11. remove(innerState);
  12. }
  13. } else {
  14. Throwable e = error; // (6)
  15. if (e != null) {
  16. child.onError(e);
  17. } else {
  18. child.onCompleted();
  19. }
  20. }
  21. }
  1. 我们创建一个 SubscriberState 把 child 包装起来,这样对每个 Subscriber 的事件分发处理就是独立的。
  2. 我们把 SubscriberState 加入到 child 中,用于取消订阅和请求处理。
  3. 我们把 innerState 加入到 subscribers 数组中,当然这一步可能失败,这就说明 Subject 自身已经被并发的终结了。
  4. 如果我们当前的 backpressure 策略是 BUFFER,那我们就要启动漏循环了。
  5. 即便 add() 成功,child 也有可能被并发的取消订阅了,这时我们就需要尝试把它移除掉。
  6. 如果(3)处的 add() 失败,就说明此时 Subject 已经终结了,那我们就需要向 child 发送终结事件(onError/onCompleted)。

实现 onXXX 就比较简单了,都是同样的套路:

  1. @Override
  2. public void onNext(T t) {
  3. if (done) {
  4. return;
  5. }
  6. for (SubscriberState<T> innerState : subscribers) {
  7. innerState.onNext(t);
  8. }
  9. }
  10. @Override
  11. public void onError(Throwable e) {
  12. if (done) {
  13. return;
  14. }
  15. error = e;
  16. done = true;
  17. for (SubscriberState<T> innerState : terminate()) {
  18. innerState.onError(e);
  19. }
  20. }
  21. @Override
  22. public void onCompleted() {
  23. if (done) {
  24. return;
  25. }
  26. done = true;
  27. for (SubscriberState<T> innerState : terminate()) {
  28. innerState.onCompleted();
  29. }
  30. }

只是简单地遍历当前的所有 Subscriber,向每个转发当前接收到的事件。

到目前为止,我们都只是把事件转发到另一个类(SubscriberState)中,现在是时候实现 SubscriberState,把事件发送给 child 了:

  1. static final class SubscriberState<T>
  2. implements Producer, Subscription, Observer<T> {
  3. final Subscriber<? super T> child; // (1)
  4. final State<T> state; // (2)
  5. final BackpressureStrategy strategy; // (3)
  6. final AtomicLong requested = new AtomicLong(); // (4)
  7. final AtomicInteger wip = new AtomicInteger(); // (5)
  8. volatile boolean unsubscribed; // (6)
  9. volatile boolean done;
  10. Throwable error;
  11. final Queue<T> queue; // (7)
  12. public SubscriberState(
  13. Subscriber<? super T> child, State<T> state) {
  14. this.child = child;
  15. this.state = state;
  16. this.strategy = state.strategy;
  17. Queue<T> q = null;
  18. if (strategy == BackpressureStrategy.BUFFER) { // (8)
  19. q = new SpscLinkedAtomicQueue<>();
  20. }
  21. this.queue = q;
  22. }
  23. @Override
  24. public void onNext(T t) {
  25. // TODO Auto-generated method stub
  26. }
  27. @Override
  28. public void onError(Throwable e) {
  29. // TODO Auto-generated method stub
  30. }
  31. @Override
  32. public void onCompleted() {
  33. // TODO Auto-generated method stub
  34. }
  35. @Override
  36. public void request(long n) {
  37. // TODO Auto-generated method stub
  38. }
  39. @Override
  40. public boolean isUnsubscribed() {
  41. return unsubscribed;
  42. }
  43. @Override
  44. public void unsubscribe() {
  45. // TODO Auto-generated method stub
  46. }
  47. void drain() {
  48. // TODO Auto-generated method stub
  49. }
  50. }
  1. 我们保持实际 Subscriber 的引用。
  2. 当 child 取消订阅时,我们需要从 State 中移除 SubscriberState 自己。
  3. 我们保存一个局部的 BackpressureStrategy,以避免每次都需要读外部类的成员。
  4. 记录 child 的请求量。
  5. 实现漏循环时需要一个 wip 变量。
  6. 我们需要记录 child 是否已经调用了 unsubscribe()
  7. 如果 backpressure 策略是 BUFFER,那我们就需要临时保存过多的数据。
  8. 最后,只有 backpressure 策略是 BUFFER 时,我们才会有一个队列实例。

接下来让我们一个一个实现上面的方法:

  1. @Override
  2. public void onNext(T t) {
  3. if (unsubscribed) {
  4. return;
  5. }
  6. switch (strategy) {
  7. case BUFFER:
  8. queue.offer(t); // (1)
  9. drain();
  10. break;
  11. case DROP: {
  12. long r = requested.get(); // (2)
  13. if (r != 0L) {
  14. child.onNext(t);
  15. if (r != Long.MAX_VALUE) {
  16. requested.decrementAndGet();
  17. }
  18. }
  19. break;
  20. }
  21. case ERROR: {
  22. long r = requested.get(); // (3)
  23. if (r != 0L) {
  24. child.onNext(t);
  25. if (r != Long.MAX_VALUE) {
  26. requested.decrementAndGet();
  27. }
  28. } else {
  29. unsubscribe();
  30. child.onError(
  31. new MissingBackpressureException());
  32. }
  33. break;
  34. }
  35. default:
  36. }
  37. }

这个方法看起来有点复杂,但其实仅仅只是因为它处理了所有的策略,事实上还是很简单的:

  1. 当处于 BUFFER 模式时,我们把数据加入到队列中,然后进入漏循环。
  2. 在 DROP 模式时,我们检查请求量,如果有请求,就发送数据,如果不是无尽模式,就递减请求量,如果没有请求量,那就直接丢弃数据。
  3. 在 ERROR 模式下,有请求的处理和 DROP 相同,没有请求时,我们就取消订阅,然后向 child 发出 MissingBackpressureException

接下来是 onError()onCompleted(),很直观:

  1. @Override
  2. public void onError(Throwable e) {
  3. if (unsubscribed) {
  4. return;
  5. }
  6. if (strategy == BackpressureStrategy.BUFFER) {
  7. error = e;
  8. done = true;
  9. drain();
  10. } else {
  11. child.onError(e);
  12. }
  13. }
  14. @Override
  15. public void onCompleted() {
  16. if (unsubscribed) {
  17. return;
  18. }
  19. if (strategy == BackpressureStrategy.BUFFER) {
  20. done = true;
  21. drain();
  22. } else {
  23. child.onCompleted();
  24. }
  25. }

剩下的三个方法,request()isUnsubscribed()unsubscribed() 看起来也应该很熟悉了:

  1. @Override
  2. public void request(long n) {
  3. if (n < 0) {
  4. throw new IllegalArgumentException();
  5. }
  6. if (n > 0) {
  7. BackpressureUtils.getAndAddRequest(requested, n);
  8. if (strategy == BackpressureStrategy.BUFFER) {
  9. drain();
  10. }
  11. }
  12. }
  13. @Override
  14. public boolean isUnsubscribed() {
  15. return unsubscribed;
  16. }
  17. @Override
  18. public void unsubscribe() {
  19. if (!unsubscribed) {
  20. unsubscribed = true;
  21. state.remove(this);
  22. if (strategy == BackpressureStrategy.BUFFER) {
  23. if (wip.getAndIncrement() == 0) {
  24. queue.clear();
  25. }
  26. }
  27. }
  28. }

取消订阅时,只有处于 BUFFER 模式时才需要进入漏循环,以及清空队列。

最后但也同样重要的,就是 drain() 了:

  1. void drain() {
  2. if (wip.getAndIncrement() != 0) {
  3. return;
  4. }
  5. int missed = 1;
  6. Queue<> q = queue;
  7. Subscriber child = this.child;
  8. for (;;) {
  9. if (checkTerminated(done, q.isEmpty(), child)) {
  10. return;
  11. }
  12. long r = requested.get();
  13. boolean unbounded = r == Long.MAX_VALUE;
  14. long e = 0L;
  15. while (r != 0) {
  16. boolean d = done;
  17. T v = q.poll();
  18. boolean empty = v == null;
  19. if (checkTerminated(d, empty, child)) {
  20. return;
  21. }
  22. if (empty) {
  23. break;
  24. }
  25. child.onNext(v);
  26. r--;
  27. e--;
  28. }
  29. if (e != 0) {
  30. if (!unbounded) {
  31. requested.addAndGet(e);
  32. }
  33. }
  34. missed = wip.addAndGet(-missed);
  35. if (missed == 0) {
  36. return;
  37. }
  38. }
  39. }

漏循环也和上文一模一样,毋庸赘言。

最后,checkTerminated() 还需要负责清理资源,让我们看看它的实现:

  1. boolean checkTerminated(boolean done,
  2. boolean empty,
  3. Subscriber<? super T> child) {
  4. if (unsubscribed) {
  5. queue.clear(); // (1)
  6. state.remove(this);
  7. return true;
  8. }
  9. if (done && empty) {
  10. unsubscribed = true; // (2)
  11. Throwable e = error;
  12. if (e != null) {
  13. child.onError(e);
  14. } else {
  15. child.onCompleted();
  16. }
  17. return true;
  18. }
  19. return false;
  20. }

如果检测到当前已被取消订阅,那我们就清空队列,并把 SubscriberState 从 State.subscribers 数组中移除(1)。但是到达终结或者空的状态时,我们不需要移除自己(2),因为此时 Subject 已经处于终结状态,State 也已经不包含任何 Subscriber 了。

关于 BehaviorSubject 的一点啰嗦

BehaviorSubject 的行为介于 PublishSubjectReplaySubject 之间,它在转发后续的事件之前,会重放此前最后一个 onNext() 事件。有人可能认为这可以通过容量为 1 的 ReplaySubject 实现,但实际上它们的终结状态行为不一样。容量为 1 的 ReplaySubject 会重放一个 onNext() 以及一个终结事件(onError/onCompleted),但 BehaviorSubject 不会重放 onNext(),只会发出终结事件。

从并发的角度来看,容量为 1 的 ReplaySubject 在处理并发 subscribe()onNext() 时会更加复杂。因为规范要求,只要订阅调用之后,我们就不能错过任何事件,我们只能保存最后一个事件,然后在发出其他新的事件之前,把它发出去。

在 RxJava 1.x 的 BehaviorSubject 实现中,使用的方式是每个 Subscriber 一个锁,并且两种情况进行了不同的处理:首个和后续的 onNext()。当订阅发生时,订阅线程会尝试进入“首个”模式,从 Subject 中读出最后一个数据,然后发送出去。如果这时有一个并发的 onNext() 调用,onNext() 暂时会被阻塞住。当“首个”模式结束之后,就会进入“后续”模式,此后 onNext() 事件就会被立即转发了。

  1. protected void emitNext(Object n, final NotificationLite<T> nl) {
  2. if (!fastPath) {
  3. synchronized (this) {
  4. first = false;
  5. if (emitting) {
  6. if (queue == null) {
  7. queue = new ArrayList<Object>();
  8. }
  9. queue.add(n);
  10. return;
  11. }
  12. }
  13. fastPath = true;
  14. }
  15. nl.accept(actual, n);
  16. }
  17. protected void emitFirst(Object n, final NotificationLite<T> nl) {
  18. synchronized (this) {
  19. if (!first || emitting) {
  20. return;
  21. }
  22. first = false;
  23. emitting = n != null;
  24. }
  25. if (n != null) {
  26. emitLoop(null, n, nl);
  27. }
  28. }

简单来说这是两个非对称的发射者循环:如果 emitNext() 赢得了竞争,那 emitFirst() 就不会运行了,那如果 subscribe()onNext() 同时发生,谁说这个 onNext() 不是这个 Subscriber 订阅之前的最后一个 onNext() 呢?

此外,这种方式仍存在一个微妙的 bug。emitFirst() 有可能会把同样的数据发射两次。

在极端情况下,onNext 设置了最后的数据,然后 emitFirst 读到了这个数据,然后 onNext 尝试执行 emitNext(),这时 emitNext 发现有线程正在发送,所以就把数据加入到了队列中。而最终,emitFirstemitLoop 中发现还有数据要发送,就会把数据取出来然后发送出去,这时数据就重复了。

解决办法比较复杂,大家可以在 RxJava 2.x 的 BehaviorSubject{:target=”_blank”} 中看到。简单来说就是我们要为每个数据加上一个版本标签,锁住 onNext() 一小段时间,在这期间丢弃掉老的数据。这种方式很明显有一个缺点,就是我们在执行过程中多了一个阻塞代码块,而理论上来说,任何并发的 subscribe 都有可能阻塞住发射者循环。当然我们也可以实现一个无锁化版本,但这就要求我们每次 onNext 时都要分配一个不可变的数据以及索引变量了。

总结

在本文中,我演示了如何实现支持三种 backpressure 策略的 PublishSubject。这是我关于 Subject 的最后一篇文章了。

如果你看了 RxJava 1.x 的源码,就会发现标准的 Subject 并没有按照这样的方式实现,但 RxJava 2.x 是这样做的。这并不是因为什么错误,而是 2.x 的实现方式是基于 1.x 的教训重新设计的。

在下一个系列中,我将利用我们这里讲到的 Subject 的内部细节,然后展示如何实现 ConnectableObservable