layout: post title: 实现操作符时的一些陷阱(三) tags:

  1. - Operator
  2. - Pitfall

原文 Pitfalls of operator implementations (part 3){:target=”_blank”}

介绍

在本系列中,我们聚焦于一些常见的,或者比较常见而且很微妙的一些实现操作符的陷阱。既然我们已经知道了更多关于 Producer,subscription 容器类,以及 scheduler 的知识,现在我们可以看更多的陷阱了。

9,订阅两次

有些操作符,尤其是那些基于 OnSubscribe 的操作符,可能会把它们收到的 Subscriber 订阅到其他的 Observable 上。defer() 就是这样一个例子。

假设你要实现一个操作符,它在把收到的 Subscriber 订阅到另一个 Observable 上之前,会调用一个回调:

  1. public final class OnSubscribeRunAction<T>
  2. implements OnSubscribe<T> {
  3. final Observable actual;
  4. final Action0 action;
  5. public OnSubscribeRunAction(Observable actual, Action0 action) {
  6. this.action = action;
  7. this.actual = actual;
  8. }
  9. @Override
  10. public void call(Subscriber child) {
  11. try {
  12. action.call();
  13. } catch (Throwable e) {
  14. child.onError(e);
  15. return;
  16. }
  17. actual.unsafeSubscribe(child);
  18. }
  19. }
  20. Observable<Integer> source = Observable.create(
  21. new OnSubscribeRunAction<>(Observable.range(1, 3),
  22. () -> {
  23. System.out.println("Subscribing!");
  24. }));
  25. TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
  26. @Override
  27. public void onStart() {
  28. Thread t = new Thread(() -> {
  29. System.out.println("Starting helper thread "
  30. + Thread.currentThread());
  31. });
  32. t.start();
  33. }
  34. };
  35. source.unsafeSubscribe(ts);

如果运行上面的例子,我们会发现 onStart 被调用了两次!问题就出在 RxJava 的 backpressure 相关内容的设计逻辑上:child 被订阅时,它的 onStart() 就会被调用,让它有机会在 onNext 之前执行它的逻辑。通常来说,这时会发出初始的请求量,或者在 subscriber 开始时显示一些 GUI 的内容。

在普通的“终点 subscriber”(end-subscribers)中,这个问题很少表现出来,因为 subscribe() 会把它们包装为一个 SafeSubscriber,而后者不会转发 onStart。但是当和其他操作符打交道时,unsafeSubscribe 是很常见的,因此 onStart 就会被执行多次。

解决办法就是在操作符中把 child 包装在一个不会转发 onStart 的 subscriber 中:

  1. // ...
  2. try {
  3. action.call();
  4. } catch (Throwable e) {
  5. child.onError(e);
  6. return;
  7. }
  8. actual.unsafeSubscribe(new Subscriber<T>(child) {
  9. @Override
  10. public void onNext(T t) {
  11. child.onNext(t);
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. child.onError(e);
  16. }
  17. @Override
  18. public void onCompleted() {
  19. child.onCompleted();
  20. }
  21. });
  22. // ...

10,scheduler worker 泄漏

假设我们在被订阅之后,不是立即执行相关的代码,而是延迟一段时间执行。它的一个使用场景是,如果任务没有按时结束,我们可以做一些优化体验的处理(例如显示一个正在处理的对话框)。

  1. public final class OnSubscribeRunActionDelayed<T>
  2. implements OnSubscribe<T> {
  3. final Observable actual;
  4. final Action0 action;
  5. final long delay;
  6. final TimeUnit unit;
  7. final Scheduler scheduler;
  8. public OnSubscribeRunActionDelayed(Observable actual,
  9. Action0 action, long delay,
  10. TimeUnit unit, Scheduler scheduler) {
  11. this.action = action;
  12. this.actual = actual;
  13. this.delay = delay;
  14. this.unit = unit;
  15. this.scheduler = scheduler;
  16. }
  17. @Override
  18. public void call(Subscriber<? super T> child) {
  19. SerializedSubscriber<T> s =
  20. new SerializedSubscriber<>(child);
  21. Worker w = scheduler.createWorker(); // (1)
  22. Subscription cancel = w.schedule(() -> {
  23. try {
  24. action.call();
  25. } catch (Throwable e) {
  26. s.onError(e);
  27. }
  28. }, delay, unit);
  29. actual
  30. .doOnCompleted(cancel::unsubscribe)
  31. .unsafeSubscribe(s);
  32. }
  33. }
  34. Observable<Integer> source = Observable.create(
  35. new OnSubscribeRunActionDelayed<>(Observable
  36. .just(1).delay(1, TimeUnit.SECONDS),
  37. () -> {
  38. System.out.println("Sorry, it takes too long...");
  39. }, 500, TimeUnit.MILLISECONDS, Schedulers.io()));
  40. Subscription s = source.subscribe(System.out::println);
  41. Thread.sleep(250);
  42. s.unsubscribe();
  43. Thread.sleep(1000);
  44. source.subscribe(System.out::println);
  45. Thread.sleep(1500);
  46. for (Thread t : Thread.getAllStackTraces().keySet()) {
  47. if (t.getName().startsWith("RxCached")) {
  48. System.out.println(t);
  49. }
  50. }
  51. }

同样,上面代码的执行结果也不符合我们的预期:即便我们取消订阅了第一次 subscription,“Sorry” 还是被打印了出来,而如果我们在最后查看所有的线程,我们会看到两个 RxCachedThreadScheduler,但是由于我们进行了复用,显然只应该有一个。

问题就出在 worker 和 schedule 的返回值没有正确参与到取消订阅中:即便我们需要实际订阅的 Observable 很快执行,但取消订阅时,只有 Observable 被取消订阅了,worker 并没有,所以它的线程永远也不会被重新放入线程池中。

这个问题比较微妙,因为 Schedulers.computation()Schedulers.trampoline() 对 scheduler 的泄漏并不敏感:前者是在一个固定大小的 worker 池中进行调度,后者不会在线程中保持任何资源,所以资源能被正确垃圾回收。但 Schedulers.io()Schedulers.from()newThread() 正好相反,在 worker 被取消订阅之前,线程都不会被复用或者关闭。

解决办法就是把 worker 和调度结果作为资源添加到 child 中,使得 child 被取消订阅时,能取消订阅所有的资源,但是,由于我们只会在这个 worker 上调度一个任务,所以取消订阅 worker 就会取消“所有”正在执行或者等待执行的任务,所以我们就没必要把调度结果也添加到 child 中,只需要添加 worker 就足够了。

  1. // ...
  2. SerializedSubscriber<T> s = new SerializedSubscriber<>(child);
  3. Worker w = scheduler.createWorker();
  4. child.add(w);
  5. w.schedule(() -> {
  6. try {
  7. action.call();
  8. } catch (Throwable e) {
  9. s.onError(e);
  10. }
  11. }, delay, unit);
  12. actual
  13. .doOnCompleted(w::unsubscribe)
  14. .unsafeSubscribe(s);
  15. // ...

把 worker 添加到了 subscriber 中

假设我们需要一个这样的操作符,它在收到一个数据时,会发出一个 Observable,这个 Observable 会延迟一定时间发出这个数据之后立即结束。

  1. public final class ValueDelayer<T>
  2. implements Operator<Observable<T>, T> {
  3. final Scheduler scheduler;
  4. final long delay;
  5. final TimeUnit unit;
  6. public ValueDelayer(long delay,
  7. TimeUnit unit, Scheduler scheduler) {
  8. this.delay = delay;
  9. this.unit = unit;
  10. this.scheduler = scheduler;
  11. }
  12. @Override
  13. public Subscriber<? super T> call(
  14. Subscriber<? super Observable<T>> child) {
  15. Worker w = scheduler.createWorker();
  16. child.add(w);
  17. Subscriber<T> parent = new Subscriber<T>(child, false) {
  18. @Override
  19. public void onNext(T t) {
  20. BufferUntilSubscriber<T> bus =
  21. BufferUntilSubscriber.create();
  22. w.schedule(() -> {
  23. bus.onNext(t);
  24. bus.onCompleted();
  25. }, delay, unit);
  26. child.onNext(bus);
  27. }
  28. @Override
  29. public void onError(Throwable e) {
  30. child.onError(e);
  31. }
  32. @Override
  33. public void onCompleted() {
  34. child.onCompleted();
  35. }
  36. };
  37. child.add(parent);
  38. return parent;
  39. }
  40. }
  41. Observable.range(1, 3)
  42. .lift(new ValueDelayer<>(1, TimeUnit.SECONDS,
  43. Schedulers.computation()))
  44. .take(1)
  45. .doOnNext(v -> v.subscribe(System.out::println))
  46. .subscribe();
  47. Thread.sleep(1500);

奇怪的是,上面的例子不会打印任何内容,但我们希望它会在 1s 之后打印 1。问题就出在 take(1) 会在接收到第一个数据之后取消上游 Observable,而这会取消我们延迟执行的任务。

解决这一问题可以有很多种方式,实际上它很依赖于我们的应用场景。显然,这里我们应该取消订阅 worker,但要让内部创建的 Observable 可以被订阅。

一种方式是引入一个原子性的计数器,记录尚未被订阅的内部 Observable 数量,并在计数减到 0 的时候取消订阅 worker。而且,这一方式要求内部的 Observable 一定要被消费。

  1. // ...
  2. Worker w = scheduler.createWorker();
  3. final AtomicBoolean once = new AtomicBoolean();
  4. final AtomicInteger wip = new AtomicInteger(1); // (1)
  5. Subscriber<T> parent = new Subscriber<T>(child, false) {
  6. @Override
  7. public void onNext(T t) {
  8. if (wip.getAndIncrement() == 0) { // (2)
  9. wip.set(0);
  10. return;
  11. }
  12. BufferUntilSubscriber<T> bus =
  13. BufferUntilSubscriber.create();
  14. w.schedule(() -> {
  15. try {
  16. bus.onNext(t);
  17. bus.onCompleted();
  18. } finally {
  19. release(); // (3)
  20. }
  21. }, delay, unit);
  22. child.onNext(bus);
  23. if (child.isUnsubscribed()) {
  24. if (once.compareAndSet(false, true)) { // (4)
  25. release();
  26. }
  27. }
  28. }
  29. @Override
  30. public void onError(Throwable e) {
  31. child.onError(e);
  32. }
  33. @Override
  34. public void onCompleted() {
  35. if (once.compareAndSet(false, true)) {
  36. release(); // (5)
  37. }
  38. child.onCompleted();
  39. }
  40. void release() {
  41. if (wip.decrementAndGet() == 0) {
  42. w.unsubscribe();
  43. }
  44. }
  45. };
  46. parent.add(Subscriptions.create(() -> { // (6)
  47. if (once.compareAndSet(false, true)) {
  48. if (wip.decrementAndGet() == 0) {
  49. w.unsubscribe();
  50. }
  51. }
  52. }));
  53. child.add(parent);
  54. return parent;
  55. }
  56. // ...

这一方案有以下几点值得注意:

  1. 我们需要一个原子性的整数和布尔值。前者用来记录未被消费的内部 Observable,也就是当前还处于活跃状态的上游 T 序列。由于上游可能在很多地方结束,而且可能会结束多次(例如在 onNext 之后就终止了,但上游仍发出了 onCompleted,而这又会导致一次取消订阅)。由于上游只应该被计数一次,所以我们需要利用 once 来保证相应的递减只会发生一次。
  2. 在尝试调度任务之前,我们把“活跃窗口”(open windows)计数加一。然而,由于(6)可能异步地把 wip 减到 0,所以从 0 增加到 1 就意味着 worker 取消订阅之后发生了 onNext,如果这时我们发出一个 Observable,它是没有机会被订阅的,所以我们直接返回,不发出 Observable。
  3. 一旦内部的 Observable 发射完毕,我们就可以释放一个“窗口”了(递减计数)。
  4. 我们在内部的 Observable 被发射出去之后,立即检查 child 是否已经被取消订阅。如果已经被取消订阅,我们尝试取消订阅上游一次。
  5. 如果下游没有被取消订阅,那我们就需要在 onCompleted 中尝试取消订阅上游一次。
  6. 由于 child 的取消订阅可能在任何时刻发生,可能会发生在两次事件发射之间,所以我们依然要在 child 被取消订阅时尝试取消订阅上游一次。

总结

大家可能会觉得这些陷阱是一些特殊情况,但作为操作符的开发者,或者打算向 RxJava 提交 PR,大家还是需要了解这些陷阱。

在本文中,我们探讨了可能错误地导致多次订阅 subscriber 的情况,并且展示了把 worker 添加到或者不添加到 child 中会导致问题的两种情况。