layout: post title: 全新 Completable API(二) tags:

  1. - Completable

原文 The new Completable API (part 2){:target=”_blank”}。

介绍

在本文中,我将展示如何实现 Completable 的操作符(包括数据源和转换操作符)。由于 Completable API 并不会发出数据,只有 onError 和 onCompleted 事件,所以它的操作符比 Observable 少得多。

Empty

我们首先实现 empty() 操作符,它会在被订阅时立即发出 onCompleted 事件。我们利用 CompletableOnSubscribe 来实现这一操作符:

  1. Completable empty = Completable.create(completableSubscriber -> {
  2. BooleanSubscription cancel = new BooleanSubscription(); // (1)
  3. completableSubscriber.onSubscribe(cancel); // (2)
  4. if (!cancel.isUnsubscribed()) {
  5. cancel.unsubscribe(); // (3)
  6. completableSubscriber.onCompleted(); // (4)
  7. }
  8. });

这里我们无需引入类型参数。Completable 的 API 设计遵循了 Reactive-Stream 规范,它通过调用 onSubscribe 传递一个 Subscription 给下游,使得下游可以取消上游。

我们需要先创建一个 Subscription 实例,这里我们用了一个 BooleanSubscription 用于检查下游是否已经取消订阅(1)。在发出终止事件之前,我们需要先调用 onSubscribe,并把 BooleanSubscription 对象传递给下游(2)。这一步是必须的,如果漏掉这一步,可能大多数操作符都会抛出 NullPointerException,或者是在转换为 Observable 时取消订阅将失效。

和 Reactive-Stream 规范中定义的一样,终止事件发生后,我们要把 Subscription 等同于已被取消订阅,所以我们在调用 onCompleted 之前自行取消订阅(3)。

上面的代码看起来对一个简单如 empty 这样的操作符未免有些复杂,我们可以稍作简化:

  1. Completable empty = Completable.create(completableSubscriber -> {
  2. completableSubscriber.onSubscribe(Subscriptions.unsubscribed());
  3. completableSubscriber.onCompleted();
  4. });

这里我们直接给 onSubscribe 传递了一个已经取消订阅的 Subscription,然后立即调用 onCompleted。这么做有以下几个原因,由于取消订阅本身就只是一个尽力而为的保证,那就意味着:a) 我们必须做好准备,以应对取消订阅之后到来的事件;b) onSubscribe 和 onCompleted 之间的时间窗口真的很小,所以对于绝大部分异步场景,isUnsubscribed 都会返回 false;c) Subscription 在 onCompleted 调用之前也应该被认为是已经取消了。

Empty delayed

假设我们需要在延迟一段时间之后发出 onCompleted 事件。为此我们需要一个 Scheduler.Worker 来进行延迟执行,但我们也得保证延迟执行的代码可以被取消。

  1. public static Completable emptyDelayed(
  2. long delay, TimeUnit unit, Scheduler scheduler) {
  3. return Completable.create(cs -> {
  4. Scheduler.Worker w = scheduler.createWorker();
  5. cs.onSubscribe(w);
  6. w.schedule(() -> {
  7. try {
  8. cs.onCompleted();
  9. } finally {
  10. w.unsubscribe();
  11. }
  12. }, delay, unit);
  13. });
  14. }

幸运的是,Scheduler.Worker 实现了 Subscription 接口,所以我们可以在调度延迟任务之前,直接把它传给 CompletableSubscriber。在调度的任务中,我们在调用 onCompleted 之后再取消订阅 worker,显然在调用 onCompleted 时 worker 并没有被取消订阅。首先即便 Reactive-Stream 规范规定,org.reactivestreams.Subscription 在终止事件到来时应该被认为已经取消订阅了,但现在我们之所以可以这样做,是因为我们无法检查 Subscription 是否已经被取消订阅。虽然在 RxJava 的实现中,我们可以检查是否已经取消订阅,但检查上游是否认为我们已经被取消订阅,并没有什么意义。其次之所以 onCompleted 和 unsubscribe 的顺序很重要,是因为如果先取消了 worker,我们在调用 onCompleted 时可能在下游被意外中断。

如果我们确实想要确保下游收到的 Subscription 已经被取消订阅,我们就得用 MultipleAssignmentSubscription 实现得没这么直观:

  1. Scheduler.Worker w = scheduler.createWorker();
  2. MultipleAssignmentSubscription mas =
  3. new MultipleAssignmentSubscription();
  4. mas.set(w);
  5. cs.onSubscribe(mas);
  6. w.schedule(() -> {
  7. mas.set(Subscriptions.unsubscribed());
  8. mas.unsubscribe();
  9. try {
  10. cs.onCompleted();
  11. } finally {
  12. w.unsubscribe();
  13. }
  14. }, delay, unit);

我们不直接把 worker 传给下游,而是用 MultipleAssignmentSubscription 包装一层,并在调用 onCompleted 之前把它的内容替换为一个已经被取消订阅的 subscription,然后取消订阅整个容器类。使用 MultipleAssignmentSubscription 容器,以及替换 MultipleAssignmentSubscription 的内容,都是为了防止过早取消订阅 worker。SerialSubscription 和 CompositeSubscription 都无法达成目标。

最后,调度 onCompleted 的执行时需要格外小心,尤其是使用 RxJava 2.x 的 scheduler 时。RxJava 2.x 的 scheduler 允许“直接调度”,也就是说我们无需创建 worker 并在使用完毕之后取消订阅 worker。如果我们只需要调度一次任务,那这种形式可以减小开销,当然这种形式不保证同一个 scheduler 上调度的任务执行的先后顺序(这一点在这里可以接收)。所以 emptyDelayed 也可以这样实现:

  1. cs.onSubscribe(
  2. scheduler.scheduleDirect(cs::onCompleted, delay, unit));

但是上面的代码存在竞争:有可能被调度的任务,以及任务中要发出的 onCompleted 事件,都会在 scheduleDirect 返回之前被执行(例如 delay 是 0),那这时 subscriber 在收到 onCompleted 之前就没有收到 onSubscribe 了。更坏的情况是 onError 和 onCompleted 可能同时发生,这违反了串行访问的原则。解决办法也是引入一个中间层:

  1. MultipleAssignmentDisposable mad =
  2. new MultipleAssignmentDisposable();
  3. cs.onSubscribe(mad);
  4. mad.set(
  5. scheduler.scheduleDirect(cs::onCompleted, delay, unit));

注:由于命名冲突,RxJava 2.x 的资源管理类叫 Disposable 而不是 Subscription,我会很快发布一个关于 2.x 的系列文章。

上面这个例子也预示了 Completable API 的一个特性:资源管理需要操作符自己负责,这里并没有像 rx.Subscriber 一样的 add(Subscription) 方法。这一点小小的不便需要我们在涉及到任务调度,或者涉及到多个资源管理时,要使用Subscription 容器类。这一设定的好处就是那些不需要资源管理的操作符,就不需要和 rx.Subscriber 一样相关的代码了,这样能节省内存分配,提高性能。

First completed

在我小学的时候,老师们经常发起一些小的挑战,同学们谁第一个完成就能得到一份小奖励。这和 amb() 很类似:第一个结束的数据源是赢家。当然,第一个发生错误的数据源也是赢家,在现实生活中失败显然不会让我们胜出。

那如何为 Completable 实现一个 amb 操作符?显然,不像 Observable.amb,我们不需要保存胜出 Completable 的信息(因为它已经终止了),所以我们不需要使用 trikery 的索引以及其他麻烦事,使用一个简单的 AtomicBoolean 就够了。

  1. public static Completable amb(Completable... students) {
  2. return Completable.create(principal -> {
  3. AtomicBoolean done = new AtomicBoolean(); // (1)
  4. CompositeSubscription all =
  5. new CompositeSubscription(); // (2)
  6. CompletableSubscriber teacher =
  7. new CompletableSubscriber() {
  8. @Override
  9. public void onSubscribe(Subscription s) {
  10. all.add(s); // (3)
  11. }
  12. @Override
  13. public void onCompleted() {
  14. if (done.compareAndSet(false, true)) { // (4)
  15. all.unsubscribe();
  16. principal.onCompleted();
  17. }
  18. }
  19. @Override
  20. public void onError(Throwable e) {
  21. if (done.compareAndSet(false, true)) { // (5)
  22. all.unsubscribe();
  23. principal.onError(e);
  24. }
  25. }
  26. };
  27. principal.onSubscribe(all); // (6)
  28. for (Completable student : students) {
  29. if (done.get() || all.isUnsubscribed()) { // (7)
  30. return;
  31. }
  32. student.subscribe(teacher); // (8)
  33. }
  34. });
  35. }

在前面小学挑战的例子中,只会有一个老师检查所有学生的进展。这是一个针对 Completable 很有趣的优化(Reactive-Stream 里面也有),而且技术上它是切实可行的:teacher 这个 CompletableSubscriber 是无状态的,它只负责转发 onXXX 事件,而不需要记住是谁发出的这个事件。这里我强调技术上,因为 Reactive-Stream 规范禁止用同一个 Subscriber 订阅到多个 Publisher 上,但这一点在我看来是太过严格了:库的编写者,他们很清楚地知道自己在做什么,应该允许他们这样做。毫无意外的是,我们只需要把 teacher 的创建移到(8)处即可,其他的代码都不需要变化;显然,这也说明了 Subscriber 是可以被复用的。尽管如此,让我们看看上面值得关注的几个地方:

  1. 我们创建了一个共享的 done 变量,它会在任何一个学生通知老师时(onCompleted 或者 onError)被设置为 true;
  2. 如果校长不喜欢这个挑战,他可以随时取消整个挑战;
  3. 老师需要在学生到来时进行注册;
  4. 并且确保第一个学生发出终止事件时,也通知校长(不幸的是,校长无法知道是哪个学生胜出)。此时其他的学生也就没必要继续了,所以我们取消整个挑战;
  5. 当然,如果发生了错误,处理方式也是一样的;
  6. 我们允许校长直接取消整个挑战;
  7. 我们把每挑战的材料交给每个学生(让老师订阅学生)。当然有可能我们还在订阅过程中挑战就已经结束了,那我们也就无需继续订阅了。

When all completed

继续我们上小学的例子,有时候学生的表现需要被集体评估,整个活动需要等到所有学生都完成评估后才能结束。如果发生了意外,我们可能需要终止整个活动,并且把受伤的学生一起送到救护车。

我希望这样的设定听起来很熟悉,如果不熟悉,它们其实分别对应着 merge 和 mergeDelayError 操作符,具体是哪个取决于学校的政策。

让我们先看看最简单的情况,我们知道学生的数量,而且任何错误都会终止整个活动:

  1. public static Completable merge(Completable... students) {
  2. return Completable.create(principal -> {
  3. AtomicInteger remaining =
  4. new AtomicInteger(students.length); // (1)
  5. CompositeSubscription all =
  6. new CompositeSubscription();
  7. CompletableSubscriber evaluator =
  8. new CompletableSubscriber() {
  9. @Override
  10. public void onSubscribe(Subscription s) {
  11. all.add(s);
  12. }
  13. @Override
  14. public void onCompleted() {
  15. if (remaining.decrementAndGet() == 0) { // (2)
  16. all.unsubscribe();
  17. principal.onCompleted();
  18. }
  19. }
  20. @Override
  21. public void onError(Throwable e) {
  22. if (remaining.getAndSet(0) > 0) { // (3)
  23. all.unsubscribe();
  24. principal.onError(e);
  25. }
  26. }
  27. };
  28. principal.onSubscribe(all);
  29. for (Completable student : students) {
  30. if (all.isUnsubscribed()
  31. || remaining.get() <= 0) { // (4)
  32. return;
  33. }
  34. student.subscribe(evaluator);
  35. }
  36. });
  37. }

上面的实现看起来和 amb() 比较类似,但也有以下几点不同:

  1. 我们需要原子地计数(递减)成功完成评估的学生数量;
  2. 一旦计数递减到 0,我们就通知校长评估完成;
  3. 如果有学生发出了错误,那我们就原子地把计数置为 0,如果此前计数不为 0,我们就取消掉其他所有学生,并向校长通报错误。注意错误的通报最多只会发生一次,因为如果有多个 onError 并发到来,只会有一个成功地把计数置为 0。任何后续的结束事件都会继续递减计数器;
  4. 如果发生了错误,或者活动被取消,我们就不再继续订阅了,以避免不必要的开销;

但如果我们不知道准确的学生数量,而且发生了错误之后,我们不希望结束整个活动,应该怎么做?这让我们的错误管理以及对学生的追踪变得复杂了一些:

  1. public static Completable mergeDelayError(
  2. Iterable<? extends Completable> students) {
  3. return Completable.create(principal -> {
  4. AtomicInteger wip = new AtomicInteger(1); // (1)
  5. CompositeSubscription all =
  6. new CompositeSubscription();
  7. Queue<Throwable> errors =
  8. new ConcurrentLinkedQueue<>(); // (2)
  9. CompletableSubscriber evaluator =
  10. new CompletableSubscriber() {
  11. @Override
  12. public void onSubscribe(Subscription s) {
  13. all.add(s);
  14. }
  15. @Override
  16. public void onCompleted() {
  17. if (wip.decrementAndGet() == 0) { // (3)
  18. if (errors.isEmpty()) {
  19. principal.onCompleted();
  20. } else {
  21. principal.onError(
  22. new CompositeException(errors));
  23. }
  24. }
  25. }
  26. @Override
  27. public void onError(Throwable e) {
  28. errors.offer(e); // (4)
  29. onCompleted();
  30. }
  31. };
  32. principal.onSubscribe(all);
  33. for (Completable student : students) {
  34. if (all.isUnsubscribed()) { // (5)
  35. return;
  36. }
  37. wip.getAndIncrement(); // (6)
  38. student.subscribe(evaluator);
  39. }
  40. evaluator.onCompleted(); // (7)
  41. });
  42. }

同样,结构看起来很类似,但不同的要求需要用不同的算法来实现:

  1. 首先我们的 wip 计数器初始值是 1。我们不知道会有多少个学生,但我们知道当 wip 变成 0 的时候,整个活动就结束了;
  2. 我们会把所有的异常都保存到一个并发的队列中;
  3. 终止条件在 evaluator 的 onCompleted 中检查:如果 wip 变成了 0,我们就检查异常队列是否为空,如果非空我们就把它们一起作为 CompositeException 发给校长;否则我们就告知校长活动成功结束;
  4. 由于异常并不会提前终止活动,所以发送异常的处理和正常结束一样,但在此之前我们需要把异常加入到队列中(注意,行为不当的 Completable 会导致出错,如果它发出了多个终止事件,就会导致提前终止);
  5. 在订阅的循环中,我们只能检查校长是否需要取消活动,wip 的值在这里无法使用,因为在循环过程中,它至少是 1(因为初始值是 1);
  6. 对每一个学生,我们先递增 wip,再订阅它;这保证了我们在循环过程中,wip 至少是 1;
  7. 最后我们手动调用一次 evaluator.onCompleted(),表示没有更多的 Completable 了,这使得 wip 可以被递减到 0 进而终止整个活动;

在 RxJava 1.x 的 Observable 体系中,很难出现代码如此紧凑,复用程度如此高的场景(甚至不可能出现)。

Transformative operators

我不认为能有多少 Completable 的“序列”是可以被变换的。大多数 Observable 的操作符都没有意义,所以在 Completable 的 API 中省略了。但还是有几个例子可以看看的。

首先我们可能想要忽略错误事件,由于这里不涉及到 onNext,所以错误发生时我们最好的选择就是自行调用 onCompleted:

  1. CompletableOperator onErrorComplete = cs -> {
  2. return new CompletableSubscriber() {
  3. @Override
  4. public void onSubscribe(Subscription s) {
  5. cs.onSubscribe(s);
  6. }
  7. @Override
  8. public void onCompleted() {
  9. cs.onCompleted();
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. cs.onCompleted();
  14. }
  15. };
  16. };
  17. source.lift(onErrorComplete).subscribe();

当然,我们也可以在错误发生时订阅到另一个 Completable:

  1. public static Completable onErrorResumeNext(
  2. Completable first,
  3. Func1<Throwable, ? extends Completable> otherFactory) {
  4. return first.lift(cs -> new CompletableSubscriber() {
  5. final SerialSubscription serial =
  6. new SerialSubscription(); // (1)
  7. boolean once;
  8. @Override
  9. public void onSubscribe(Subscription s) {
  10. serial.set(s); // (2)
  11. }
  12. @Override
  13. public void onCompleted() {
  14. cs.onCompleted();
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. if (!once) { // (3)
  19. once = true;
  20. otherFactory.call(e).subscribe(this); // (4)
  21. } else {
  22. cs.onError(e); // (5)
  23. }
  24. }
  25. });
  26. }

在上面的例子中,我们把 CompletableOperator lift 到传入的 Completable 上。在操作符主体中,我们返回一个具有以下行为的 CompletableSubscriber:

  1. 由于我们可能会切换上游,所以我们需要把第一个 Completable 传入的 Subscription 更换为其他 Completable 传入的 Subscription。这里我们也可以使用 MultipleAssignmentSubscription。这和 Observable 操作符中常见的 ProducerArbiter 机制差不多,尽管简单了不少。此外,我们会在后来的 Completable 中复用这个 serial 对象,当然,如果它也出错了,我们就不会再尝试了;
  2. 我们把 Subscription 设置到 SerialSubscription 中,把老的 Subscription 淘汰掉;
  3. 如果 first 发生了错误,我们需要确保订阅到新的 Completable 只会发生一次;
  4. 这里我们复用了 CompletableSubscriber 对象,节省了一次内存分配。(2)处的代码确保了取消链条被正确维护。这里为了简洁,我省略了工厂方法调用周围的 try-catch;如果发生了异常,我们可以把它和 onError 的异常合并为一个 CompositeException,并调用 cs.onError
  5. 如果第二个 Completable 也发生了异常,我们就把错误发往下游;

考虑一下,如果我们需要在 onCompleted 发生后执行类似的逻辑(例如 andThenendWithconcatWith),我们可以用类似的方式实现。只不过我们不是在 onError 中做事,而是在 onCompleted 中做事:

  1. // ...
  2. @Override
  3. public void onCompleted() {
  4. if (!once) { // (3)
  5. once = true;
  6. otherFactory.call(e).subscribe(this); // (4)
  7. } else {
  8. cs.onCompleted(); // (5)
  9. }
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. cs.onError(e);
  14. }

最后,我们可能需要在第一个 Completable 超时后,切换到另一个上:

  1. public static Completable timeout(
  2. Completable first,
  3. long timeout, TimeUnit unit, Scheduler scheduler
  4. Completable other) {
  5. return first.lift(cs -> new CompletableSubscriber() {
  6. final CompositeSubscription csub =
  7. new CompositeSubscription(); // (1)
  8. final AtomicBoolean once = new AtomicBoolean(); // (2)
  9. @Override
  10. public void onSubscribe(Subscription s) {
  11. csub.add(s); // (3)
  12. Scheduler.Worker w = scheduler.createWorker();
  13. csub.add(w);
  14. cs.onSubscribe(csub);
  15. w.schedule(this::onTimeout, timeout, unit); // (4)
  16. }
  17. @Override
  18. public void onCompleted() {
  19. if (once.compareAndSet(false, true)) {
  20. csub.unsubscribe(); // (5)
  21. cs.onCompleted();
  22. }
  23. }
  24. @Override
  25. public void onError(Throwable e) {
  26. if (once.compareAndSet(false, true)) {
  27. csub.unsubscribe();
  28. cs.onError(e);
  29. }
  30. }
  31. void onTimeout() {
  32. if (once.compareAndSet(false, true)) { // (6)
  33. csub.clear();
  34. other.subscribe(new CompletableSubscriber() {
  35. @Override
  36. public void onSubscribe(Subscription s) {
  37. csub.add(s);
  38. }
  39. @Override
  40. public void onCompleted() {
  41. cs.onCompleted();
  42. }
  43. @Override
  44. public void onError(Throwable e) {
  45. cs.onError(e);
  46. }
  47. });
  48. }
  49. }
  50. });
  51. }

这个操作符就更复杂一些了:

  1. 我们需要追踪前后两个 Completable 的 Subscription,以及用于触发超时的 worker;
  2. 由于第一个 Completable 的终止事件和超时事件存在竞争,我们需要用一个原子变量来控制只有其中之一胜出(就像 amb() 那样);
  3. 当第一个上游的 Subscription 到来时,我们创建一个容器,并把 Subscription 加入到容器中,然后创建一个 worker,再把容器传递给下游;
  4. 最后,我们调度一次 onTimeout 的执行。这一设定让我们避免了 onTimeout 和 onSubscribe 之间的竞争,例如,如果 schedule 发生在 timeout() 被调用时,那超时就有可能在 Subscription 到来之前发生,那我们就需要做一些额外的事情以确保不出问题(这里不详细展开)。大多数时候,在 Reactive-Stream 兼容的 RxJava 2.x 中使用这种风格的 onSubscribe 实现,能够减少很多让我们头疼的事情;
  5. 如果第一个 Completable 的终止事件先来,我们就原子地、有条件地设置标记变量(这能阻止 onTimeout 执行其内部逻辑)。由于容器也管理着 worker,所以我们在取消了容器之后,就把终止事件发往下游;
  6. 如果 onTimeout 先发生,并且成功设置了标记变量,那我们就把容器清空掉,然后用一个全新的 CompletableSubscriber 订阅到第二个 Completable 中。这里我们之所以先清空容器,是为了容纳第二个 Completable 的 Subscription,而如果不清空,一个已经被取消的容器就不能使用了。清空容器的第二个好处是,它也会取消第一个 Completable,以及进行调度的 worker;

Hot Completable?

Hot Completable 仅仅是这里的最后一点思考,我并不确定它(或者一个 published Completable)在此之外是否有实际使用场景,但我们在这里还是看看如何实现它。由于 Completable 并不会发出数据,所以只需要实现一种 CompletableSubject 即可,它不会重放任何数据,只重放终止事件。

首先看看 CompletableSubject 的结构:

  1. public final class CompletableSubject
  2. extends Completable implements CompletableSubscriber {
  3. public static CompletableSubject create() {
  4. State state = new State();
  5. return new CompletableSubject(state);
  6. }
  7. static final class State
  8. implements CompletableOnSubscribe, CompletableSubscriber {
  9. // TODO state fields
  10. boolean add(CompletableSubscriber t) {
  11. // TODO implement
  12. }
  13. void remove(CompletableSubscriber t) {
  14. // TODO implement
  15. }
  16. @Override
  17. public void call(CompletableSubscriber t) {
  18. // TODO implement
  19. }
  20. @Override
  21. public void onSubscribe(Subscription d) {
  22. // TODO implement
  23. }
  24. @Override
  25. public void onCompleted() {
  26. // TODO implement
  27. }
  28. @Override
  29. public void onError(Throwable e) {
  30. // TODO implement
  31. }
  32. }
  33. static final class CompletableSubscription
  34. extends AtomicBoolean implements Subscription {
  35. /** */
  36. private static final long serialVersionUID =
  37. -3940816402954220866L;
  38. final CompletableSubscriber actual;
  39. final State state;
  40. public CompletableSubscription(
  41. CompletableSubscriber actual, State state) {
  42. this.actual = actual;
  43. this.state = state;
  44. }
  45. @Override
  46. public boolean isUnsubscribed() {
  47. return get();
  48. }
  49. @Override
  50. public void unsubscribe() {
  51. if (compareAndSet(false, true)) {
  52. state.remove(this);
  53. }
  54. }
  55. }
  56. final State state;
  57. private CompletableSubject(State state) {
  58. super(state);
  59. this.state = state;
  60. }
  61. @Override
  62. public void onSubscribe(Subscription d) {
  63. state.onSubscribe(d);
  64. }
  65. @Override
  66. public void onCompleted() {
  67. state.onCompleted();
  68. }
  69. @Override
  70. public void onError(Throwable e) {
  71. state.onError(e);
  72. }
  73. }

它的结构和以前的 Subject 别无二致。我们必须通过工厂方法创建 CompletableSubject,并且需要 CompletableSubscriber 的方法把事件委托到共享的 State 对象上。CompletableSubscription 用于追踪每个 CompletableSubscriber,并根据它们的标识管理取消订阅。

State 类需要记录一个终止标记,以及一个可能发生的 Throwable,加上当前的下游 CompletableSubscriber 数组:

  1. Throwable error;
  2. volatile CompletableSubscription[] subscribers = EMPTY;
  3. static final CompletableSubscription[] EMPTY =
  4. new CompletableSubscription[0];
  5. static final CompletableSubscription[] TERMINATED =
  6. new CompletableSubscription[0];
  7. boolean add(CompletableSubscription t) {
  8. if (subscribers == TERMINATED) {
  9. return false;
  10. }
  11. synchronized (this) {
  12. CompletableSubscription[] a = subscribers;
  13. if (a == TERMINATED) {
  14. return false;
  15. }
  16. CompletableSubscription[] b =
  17. new CompletableSubscription[a.length + 1];
  18. System.arraycopy(a, 0, b, 0, a.length);
  19. b[a.length] = t;
  20. subscribers = b;
  21. return true;
  22. }
  23. }
  24. void remove(CompletableSubscription t) {
  25. CompletableSubscription[] a = subscribers;
  26. if (a == EMPTY || a == TERMINATED) {
  27. return;
  28. }
  29. synchronized (this) {
  30. a = subscribers;
  31. if (a == EMPTY || a == TERMINATED) {
  32. return;
  33. }
  34. int j = -1;
  35. for (int i = 0; i < a.length; i++) {
  36. if (a[i] == t) {
  37. j = i;
  38. break;
  39. }
  40. }
  41. if (j < 0) {
  42. return;
  43. }
  44. if (a.length == 1) {
  45. subscribers = EMPTY;
  46. return;
  47. }
  48. CompletableSubscription[] b =
  49. new CompletableSubscription[a.length - 1];
  50. System.arraycopy(a, 0, b, 0, j);
  51. System.arraycopy(a, j + 1, b, j, a.length - j - 1);
  52. subscribers = b;
  53. }
  54. }

add 和 remove 的实现与其他 Subject 的实现一模一样,主要用于维护当前的下游 CompletableSubscriber 数组。

接下来让我们处理新来的 Subscriber:

  1. @Override
  2. public void call(CompletableSubscriber t) {
  3. CompletableSubscription cs =
  4. new CompletableSubscription(t, this);
  5. t.onSubscribe(cs);
  6. if (add(cs)) {
  7. if (cs.isUnsubscribed()) {
  8. remove(cs);
  9. }
  10. } else {
  11. Throwable e = error;
  12. if (e != null) {
  13. t.onError(e);
  14. } else {
  15. t.onCompleted();
  16. }
  17. }
  18. }

我们为每个 CompletableSubscriber 创建一个 CompletableSubscription,用来保存 subscriber 以及当前的状态:如果 child 调用了 unsubscribe,那它就可以把自己从 state 的 subscriber 数组中移除了。注意,这里存在 add 和取消订阅的竞争,这会导致 CompletableSubscriber 被意外保留。所以我们在 add 之后检查是否取消订阅,如果已经取消,我们就再调用 remove。如果 add 返回了 false,这说明 CompletableSubject 已经处于终止状态,读取 error 成员我们可以判断是否发生了异常,并确定向下游发送何种事件。

处理 onXXX 也不复杂:

  1. @Override
  2. public void onSubscribe(Subscription d) {
  3. if (subscribers == TERMINATED) {
  4. d.unsubscribe();
  5. }
  6. }
  7. @Override
  8. public void onCompleted() {
  9. CompletableSubscription[] a;
  10. synchronized (this) {
  11. a = subscribers;
  12. subscribers = TERMINATED;
  13. }
  14. for (CompletableSubscription cs : a) {
  15. cs.actual.onCompleted();
  16. }
  17. }
  18. @Override
  19. public void onError(Throwable e) {
  20. CompletableSubscription[] a;
  21. synchronized (this) {
  22. a = subscribers;
  23. error = e;
  24. subscribers = TERMINATED;
  25. }
  26. for (CompletableSubscription cs : a) {
  27. cs.actual.onError(e);
  28. }
  29. }

onSubscribe 的行为倒是值得商榷,这里我的做法是,如果 CompletableSubject 已经处于终止状态,就将其取消订阅。即便不这样,我们也不能做太多事,因为 CompletableSubject 可能订阅到很多 Completable,任何一个都可能让其进入到终止状态。我们也可以忽略传入的 Subscription,或者把它们都保存到一个容器中。

onError 和 onCompleted 的处理基本一致,我们原子地把 subscriber 数组设置为终止标记,然后向此前的每个 subscriber 发出相应的终止事件。注意,在 onError 中,我们先设置 error 成员,再设置 subscribers 成员,这让我们可以在前面的 call() 函数中得到恰当的可见性。

总结

在本文中,我详细展示了如何实现 Completable 操作符,既包括数据源操作符,也包括转换型操作符,甚至还实现了一个 Subject。

Completable 更像是一个类型工具,它不会发出任何事件,只是表示有“副作用”的代码执行是否成功。简化了的 API 用起来可能比 Observable 更加方便。

实现 Completable 操作符比实现支持 backpressure 的 Observable 操作符更简单,但我们还是需要考虑取消订阅链条、避免 onXXX 的竞争,以及利用好 AtomicXXX 类型以更高效地实现状态管理。

既然我们已经有了更多编写 Subject 的经验,下一个系列让我们看看 ConnectableObservable。