layout: post title: Async Iterable/Enumerable vs. Reactive-Streams tags:

  1. - 对比点评

原文 Async Iterable/Enumerable vs. Reactive-Streams{:target=”_blank”}。

介绍

在响应式数据处理流水线中,当两个部分处理数据速度不同时,如果我们不希望缓冲区爆炸或者内存耗尽,那 backpressure 就至关重要了。RxJava 和 Reactive-Streams 设计了一套非阻塞、基于请求协调的处理机制,但可能我们也听说过其他的解决方案。一个经常被提及的概念就是 Async Iterables(Java 中的概念)或者 Async Enumerables(C# 中的概念)了。

实际上 Rx.NET 有一个子项目 Ix.NET(Interactive Extensions 的缩写),其中就使用了 Async Enumerables。它通过 MoveNext()(相当于 hasNext())返回一个 Task(相当于 CompletableFuturePromise)来解决 backpressure 的问题:当 Task 完成时,我们可以消费 Current 的值(相当于 next() 函数),由于我们只能在消费了当前的数据之后才能调用 MoveNext(),所以自然就解决了 backpressure 的问题。

遗憾的是,我没有找到 IAsyncEnumerable 的 Java 实现(不过我也只是用 Google 搜索了几次),所以我决定用 Java 8 自己实现一下,看看让数据在其中流动需要做些什么,以及对比一下它和我当前对响应式数据流的最新理解—— Reactive-Streams-Commons ——的性能差异。

基本 API

由于 Async Enumerables 的设计从源头上考虑了延迟执行,所以基本 API 包括下面两个接口:

  1. interface IAsyncEnumerable<T> {
  2. IAsyncEnumerator<T> enumerator();
  3. }
  4. interface IAsyncEnumerator<T> {
  5. CompletionStage<Boolean> moveNext(CompositeSubscription cancel);
  6. T current();
  7. }

IAsyncEnumerable 相当于是 Iterable,它能发出 IAsyncEnumeratorIAsyncEnumeratormoveNext 函数能返回一个 CompletionStage,它能用于表明当前能否通过 current() 获取数据(true 表示能,false 表示数据流终止)。C# 的 CancellationTokenCompositeSubscription 类似,所以我就用它来实现取消了。

(注:我现在还没想好怎么实现组合取消,Ix.NET 的 IAsyncEnumerator 是一个 IDisposable,而且 Task 也可以取消,不像 CompletionStage 无法取消。不过幸运的是,在这篇博文中,我并不太需要这个特性。)

消费 IAsyncEnumerable

消费一个 IAsyncEnumerable 还是很直观的,尽管不像 C# 的 async/await 那样简单。如果我们只关心单一数据,那我们可以这样实现:

  1. IAsyncEnumerable<T> source = ...
  2. IAsyncEnumerator<T> enumerator = source.enumerator();
  3. enumerator.moveNext(new CompositeSubscription())
  4. .whenComplete((b, e) -> {
  5. if (e != null) {
  6. e.printStackTrace();
  7. } else if (b) {
  8. System.out.println(enumerator.current());
  9. } else {
  10. System.out.println("Empty!");
  11. }
  12. });

当然,基于 CompletionStage 的 API,我们可以这样轻易地处理单一的数据。

但如果要消费 IAsyncEnumerator 中的多个数据,那就要做更多事情了,我们需要递归调用 moveNext,直到数据流终止,或者出现错误:

  1. public void consumeAll(IAsyncEnumerator<T> enumerator, CompositeSubscription csub) {
  2. if (csub == null) {
  3. csub = new CompositeSubscription();
  4. }
  5. CompositeSubscription fcsub = csub;
  6. enumerator.moveNext(new CompositeSubscription())
  7. .whenComplete((b, e) -> {
  8. if (e != null) {
  9. e.printStackTrace();
  10. } else if (b) {
  11. System.out.println(enumerator.current());
  12. // go recursive
  13. consumeAll(enumerator, fcsub);
  14. } else {
  15. System.out.println("Empty!");
  16. }
  17. });
  18. }

不幸的是,如果 CompletionStage 是同步的,那我们可能遇到 StackOverflowError,因为我们在无限调用 consumeAll。因此我们需要一个线程跳转,以确保调用栈不会太深:

  1. public final class AsyncConsumer<T> implements Subscription {
  2. final Consumer<? super T> onNext;
  3. final Consumer<Throwable> onError;
  4. final Runnable onComplete;
  5. final IAsyncEnumerator<T> enumerator;
  6. final AtomicInteger wip;
  7. final Queue<CompletionStage<Boolean>> queue;
  8. final CompositeSubscription csub;
  9. final CountDownLatch cdl;
  10. public AsyncConsumer(
  11. IAsyncEnumerator<T> enumerator,
  12. Consumer<? super T> onNext,
  13. Consumer<Throwable> onError,
  14. Runnable onComplete
  15. ) {
  16. this.enumerator = enumerator;
  17. this.onNext = onNext;
  18. this.onError = onError;
  19. this.onComplete = onComplete;
  20. this.wip = new AtomicInteger();
  21. this.queue = new SpscLinkedArrayQueue<>(16);
  22. this.csub = new CompositeSubscription();
  23. this.cdl = new CountDownLatch();
  24. }
  25. public void consumeAll() {
  26. if (csub.isUnsubscribed()) {
  27. cdl.countDown();
  28. return;
  29. }
  30. CompletionStage<T> stage = enumerator.moveNext(csub);
  31. queue.offer(stage);
  32. if (wip.getAndIncrement() == 0) {
  33. do {
  34. stage = queue.poll();
  35. stage.whenComplete((b, e) -> {
  36. if (csub.isUnsubscribed()) {
  37. cdl.countDown();
  38. return;
  39. } else if (e != null) {
  40. onError.accept(e);
  41. cdl.countDown();
  42. } else if (b) {
  43. onNext.accept(enumerator.current());
  44. consumeAll();
  45. } else {
  46. onComplete.run();
  47. cdl.countDown();
  48. }
  49. });
  50. } while (wip.decrementAndGet() != 0);
  51. }
  52. }
  53. public void await() throws InterruptedException {
  54. cdl.await();
  55. }
  56. public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
  57. return cdl.await(timeout, unit);
  58. }
  59. @Override
  60. public void unsubscribe() {
  61. csub.unsubscribe();
  62. }
  63. @Override
  64. public boolean isUnsubscribed() {
  65. return csub.isUnsubscribed();
  66. }
  67. }

看起来很吸引人,除了发出事件的回调以及 IAsyncEnumerator,我们还需要一个 wip 计数器和 queue,就像以前的队列漏一样。我们用 CompositeSubscription 实现取消,用 CountDownLatch 实现阻塞等待。consumeAll() 每次消费一个数据,线程跳转的循环确保了每次只会有一个 whenComplete() 处于运行中。在回调中,我们用 Consumer 发出相应的事件,如果来了数据,那就递归调用 consumeAll。线程的跳转确保了无论是同步调用还是异步调用,我们都不会重入 consumeAll

编写 IAsyncEnumerable 数据源

当然我们需要数据源才能工作,作为所有响应式编程库中最基础、最标准的数据源——range()——可以说是响应式编程领域的 for 循环了。

  1. public final class AsyncRange implements IAsyncEnumerable<Integer> {
  2. final int start;
  3. final int count;
  4. @Override
  5. public IAsyncEnumerator<Integer> enumerator() {
  6. return new AsyncRangeEnumerator(start, count);
  7. }
  8. static final class AsyncRangeEnumerator implements IAsyncEnumerator<Integer> {
  9. final long end;
  10. long index;
  11. static final CompletionStage<Boolean> TRUE =
  12. CompletableFuture.completedFuture(true);
  13. static final CompletionStage<Boolean> FALSE =
  14. CompletableFuture.completedFuture(false);
  15. public AsyncRangeEnumerator(int start, int count) {
  16. this.index = start - 1;
  17. this.end = (long)start + count;
  18. }
  19. @Override
  20. public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
  21. long i = index + 1;
  22. if (i == end) {
  23. return FALSE;
  24. }
  25. index = i;
  26. return TRUE;
  27. }
  28. @Override
  29. public Integer current() {
  30. return index;
  31. }
  32. }
  33. }

range 操作本身是同步的,在我看来,CompletableFutureAsyncSubject 一样,而由于我们需要返回的是 true 或者 false 常量,因此我们可以使用共享的、已结束的 Future 对象。由于 moveNext() 会在 current() 之前调用,所以 index 初始值是 start - 1,并在 moveNext() 中把 index 加一。如果 index == end,那我们就返回 false,否则我们把 index 加一然后返回 true

异步化

当然我们需要的是异步,所以让我们实现一下 observeOnsubscribeOn 操作符。前者确保 CompletionStage<Boolean> 的代码在指定的线程执行,后者确保 moveNext() 的代码执行在指定的线程(所以我们就可以在 moveNext() 或者 enumerator() 中进行阻塞 IO 操作了)。此外,编写这两个操作符我们已经很熟练了,对吧?

observeOn

如果大家有些不熟悉的操作符要实现,那我建议看看 Erik Meijer 在 Channel 9 的视频:follow the types。我们知道,我们有一个上游数据源,以及一些异步数据源。为了简洁起见,我们直接使用 Executor,因为 CompletionStageXXXAsync 函数和 Executor 几乎一模一样。

  1. public final class AsyncObserveOn<T> implements IAsyncEnumerable<T> {
  2. final IAsyncEnumerable<T> source;
  3. final Executor executor;
  4. public AsyncObserveOn(IAsyncEnumerable<T> source, Executor executor) {
  5. this.source = source;
  6. this.executor = executor;
  7. }
  8. @Override
  9. public IAsyncEnumerator<T> enumerator() {
  10. return new AsyncObserveOnEnumerator<>(source.enumerator(), executor);
  11. }
  12. // ...
  13. }

这个套路大家应该都很熟悉了,脏活累活都被 AsyncObserveOnEnumerator 包揽了:

  1. static final class AsyncObserveOnEnumerator<T> implements IAsyncEnumerator<T> {
  2. final IAsyncEnumerator<T> enumerator;
  3. final Executor executor;
  4. public AsyncObserveOnEnumerator(IAsyncEnumerator<T> enumerator, Executor executor) {
  5. this.enumerator = enumerator;
  6. this.executor = executor;
  7. }
  8. @Override
  9. public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
  10. return enumerator.moveNext(csub).thenApplyAsync(v -> v, executor);
  11. }
  12. @Override
  13. public T current() {
  14. return enumerator.current();
  15. }
  16. }

我承认我对 CompletionStage 不是很熟悉,所以我觉得 thenApplyAsync 是把数据送到指定 Executor 最简单的办法了。其他的代码看起来都很直观了,而且比咱们 Rx 风格的 observeOn() 简短得多。

subscribeOn

由于我们不知道 IAsyncEnumerable 是否会在 moveNext() 中实现异步,我们必须想个办法(subscribeOn),确保 moveNext() 发生在另外的线程:

  1. public final class AsyncSubscribeOn<T> implements IAsyncEnumerable<T> {
  2. final IAsyncEnumerable<T> source;
  3. final Executor executor;
  4. public AsyncSubscribeOn(IAsyncEnumerable<T> source, Executor executor) {
  5. this.source = source;
  6. this.executor = executor;
  7. }
  8. @Override
  9. public IAsyncEnumerator<T> enumerator() {
  10. AxSubscribeOnEnumerator<t> enumerator = new AxSubscribeOnEnumerator<>(executor);
  11. executor.execute(() -> {
  12. IAsyncEnumerator<T> ae = source.enumerator();
  13. enumerator.setEnumerator(ae);
  14. });
  15. return enumerator;
  16. }
  17. // ...
  18. }

我们把 source.enumerator() 的调用转移到了 Executor 中,而不是直接调用,在调用 enumerator() 时我们将会有一个合法的 IAsyncEnumerator(这一操作在消费者看来被推迟了),不过我们仍需要返回一个 IAsyncEnumerator。这里的困难之处就在于如何允许在没有合法的上游 Enumerator 时调用 moveNext()。幸好,CompletionStage 能帮我们解决这一难题:

  1. static final class AxSubscribeOnEnumerator<T> implements IAsyncEnumerator<T> {
  2. final Executor executor;
  3. final CompletableFuture<IAsyncEnumerator<T>> onEnumerator;
  4. public AxSubscribeOnEnumerator(Executor executor) {
  5. this.executor = executor;
  6. this.onEnumerator = new CompletableFuture<>();
  7. }
  8. void setEnumerator(IAsyncEnumerator<T> enumerator) {
  9. onEnumerator.complete(enumerator);
  10. }
  11. @Override
  12. public CompletionStage<Boolean> moveNext(CompositeSubscription token) {
  13. return onEnumerator.thenComposeAsync(ae -> ae.moveNext(token), executor);
  14. }
  15. @Override
  16. public T current() {
  17. IAsyncEnumerator<T> ae = onEnumerator.getNow(null);
  18. return ae != null ? ae.current() : null;
  19. }
  20. }

我们准备了一个 onEnumerator CompletableFuture,当 setEnumerator 调用时,我们将获得实际的上游 IAsyncEnumerator,这时 Future 将会完成。这里最大的技巧在于,当 onEnumerator 的值到达时,我们立即在 Executor 上调用上游的 moveNext() 方法。thenComposeAsync 操作符和 flatMap 差不多。current() 函数则需要一些额外的逻辑,我们先尝试获取上游 Enumerator,如果此时还没有,那就直接返回 null,如果相应的 CompletionStage 还没有发出之前,我们不能调用 current()

基准测试

既然我们有了这三个最基础的操作符,那就为 IAsyncEnumerable 和最新的 Reactive-Streams-Commons 做一个基准测试的对比吧。相关的源码可以从 GitHub 获取。为了方便,我把上述操作符实现为了流式 API,基本类型是 Ax:Async Extensions。

下面是基准测试的结果(数值越大越好):(i7 4790, Windows 7 x64, Java 8u92)

- 对比点评 - 图1

range 测试的是 range(1, count)rangeAsync 测试的是 range(1, count).observeOn(executor)rangePipeline 测试的是 range(1, count).subscribeOn(executor1).observeOn(executor2)ax 代表我实现的 IAsyncEnumerablepx 代表 Reactive-Streams-Commons (Rsc) 的 Publisher Extensions。由于 Rsc 实现了操作符熔合,因此 rangeAsync() 在启用和禁用操作符熔合的情况下分别作了测试,pxf 代表启用了操作符熔合的 Rsc。

评估

看起来 Rsc 完爆了我实现的 IAsyncEnumerable,无论是同步模式还是异步模式。由于没有第三方的实现可以做对比,我只能猜测 IAsyncEnumerable 开销如此之大的原因了。我有限的 CompletionStage 使用经验告诉我一些原因,但我怀疑这并非根源所在。由于两个库都是使用了单线程 Executor,所以可以排除 Executor 的开销。

剩下的就是架构上、概念上的差异了:

  • 每个数据都需要创建一个 CompletionStage,以及一个消费者;Rsc 没有任何内存分配;
  • CompletionStage 的行为介于 hot 和 cold 之间,就像 AsyncSubject 那样,当我们把消费者连接上去的时候,它可能仍在运行,也可能早已停止;这项检查增加了额外开销;Rsc 则是尽可能直接调用 onNext
  • 调用链越长,就需要越多的 CompletionStage,这就意味着内存分配和单独的任务调度;Rsc 则在异步边界上实现了批处理机制;

总结

我觉得这里的 IAsyncEnumerable 就是响应式数据流的一个特例,它每次都调用 request(1),也存在一些内存分配的开销,使得和高度优化的响应式数据流相比,开销高得多。

当然它看起来更精简,操作符的实现也更简单,但我不得不问,它和 Reactive-Streams 相比,有何优势?

如果大家对优化 IAsyncEnumerable 有任何建议,或者能推荐第三方的实现,我将很乐意重新做一次基准测试,并调整我对这个话题的观点。