layout: post title: Operator 并发原语: producers(四),RangeProducer 优化 tags:

  1. - Operator
  2. - Producer

原文 Operator concurrency primitives: producers (part 4){:target=”_blank”}

介绍

在实现了相对复杂的 producer{:target=”_blank”} 之后,现在是时候关注简单一点的内容了。在本文中,我将对最初介绍的 RangeProducer 进行一次优化:在无限请求时增加一个发射快路径。

在 RxJava 中,如果第一次就请求 Long.MAX_VALUE 等同于请求无限的数据,并且会触发很多发射快路径,就像支持 backpressure 之前的远古时代那样。在这种情况下,我们无需响应请求并生产数据了(只需处理好取消订阅即可)。

RangeProducer 的快路径

我只列出 request() 方法的代码,因为其他部分的代码完全没有变化:

  1. // ... same as before
  2. @Override
  3. public void request(long n) {
  4. if (n < 0) {
  5. throw new IllegalArgumentException();
  6. }
  7. if (n == 0) {
  8. return;
  9. }
  10. if (BackpressureUtils.getAndAddRequest(this, n) != 0) {
  11. return;
  12. }
  13. if (n == Long.MAX_VALUE) { // (1)
  14. if (child.isUnsubscribed()) {
  15. return;
  16. }
  17. int i = index; // (2)
  18. int k = remaining;
  19. while (k != 0) {
  20. child.onNext(i);
  21. if (child.isUnsubscribed()) { // (3)
  22. return;
  23. }
  24. i++; // (4)
  25. k--;
  26. }
  27. if (!child.isUnsubscribed()) {
  28. child.onCompleted();
  29. }
  30. return; // (5)
  31. }
  32. long r = n;
  33. for (;;) {
  34. // ... same as before

快路径的工作原理如下:

  1. 如果我们成功把计数从 0 增加到 n,并且 n 为 Long.MAX_VALUE,我们就进入了快路径,如果 n 小于 Long.MAX_VALUE,我们将执行慢路径。
  2. 我们把 producer 的状态读取到局部变量中。注意,如果之前在慢路径中发射过数据,那我们读取到的值将反映出我们继续发射的位置。如果当前这次无限的请求得到了发射的权利(当然得到了,因为现在我们已经进入了快路径)。
  3. 检查 child 是否已经取消了订阅。
  4. 我们递增 i,递减 k
  5. 在所有的数据以及结束事件发射完毕之后,我们就直接退出执行,而不再调整内部的请求计数。这确保了结束之后的请求既不会进入快路径,也不会进入慢路径,因为 BackpressureUtils.getAndAddRequest 永远不会成功。

注意,小量请求后接着一个无限请求这种情况在 RxJava 中不会发生。操作符要么开启了 backpressure,要么没有开启 backpressure,所以我们无需担心,如果无限请求在慢路径循环中和 r = addAndGet(-e); 之间到来并且可能把请求计数递减到 Long.MAX_VALUE 之下,而导致我们被陷在慢路径中。

实现一个基于数组的 producer

RxJava 的 from() 操作符支持传入一个 T 类型的数组,但在其内部实现中,这个数组会在 producer 中被转化为一个列表并进行遍历。这种方式看起来不必要,既然我们拿到的是一个已知长度的数组,那我们就无需 Iterator 而是直接利用下标进行遍历了(你可能会认为 JIT 会对此进行优化,使得 Iterator 在栈上进行分配,但 onNext() 中的代码有可能会阻止此项优化)。另外,由于 from() 不支持基本类型的数组,所以你可能需要自行编写一个支持此类型的操作符。

RangeProducer 的结构是实现这个功能的一个不错的选择:我们可以用 index 来记录当前遍历到数组的下标,然后把它和数组长度进行对比以决定何时退出。

  1. public final class ArrayProducer
  2. extends AtomicLong implements Producer {
  3. /** */
  4. private static final long serialVersionUID = 1L;
  5. final Subscriber child;
  6. final int[] array; // (1)
  7. int index;
  8. public ArrayProducer(Subscriber child,
  9. int[] array) {
  10. this.child = child;
  11. this.array = array;
  12. }
  13. @Override
  14. public void request(long n) {
  15. if (n < 0) {
  16. throw new IllegalArgumentException();
  17. }
  18. if (n == 0) {
  19. return;
  20. }
  21. if (BackpressureUtils.getAndAddRequest(this, n) != 0) {
  22. return;
  23. }
  24. final int[] a = this.array;
  25. final int k = a.length; // (2)
  26. if (n == Long.MAX_VALUE) {
  27. if (child.isUnsubscribed()) {
  28. return;
  29. }
  30. int i = index;
  31. while (i != k) { // (3)
  32. child.onNext(a[i]);
  33. if (child.isUnsubscribed()) {
  34. return;
  35. }
  36. i++;
  37. }
  38. if (!child.isUnsubscribed()) {
  39. child.onCompleted();
  40. }
  41. return;
  42. }
  43. long r = n;
  44. for (;;) {
  45. if (child.isUnsubscribed()) {
  46. return;
  47. }
  48. int i = index;
  49. int e = 0;
  50. while (r > 0 && i != k) {
  51. child.onNext(a[i]);
  52. if (child.isUnsubscribed()) {
  53. return;
  54. }
  55. i++;
  56. if (i == k) { // (4)
  57. child.onCompleted();
  58. return;
  59. }
  60. e++;
  61. r--;
  62. }
  63. index = i;
  64. r = addAndGet(-e);
  65. if (r == 0) {
  66. return;
  67. }
  68. }
  69. }
  70. }
  71. int[] array = new int[200];
  72. Observable<Integer> source = Observable.create(child -> {
  73. if (array.length == 0) {
  74. child.onCompleted();
  75. return;
  76. }
  77. ArrayProducer ap = new ArrayProducer(child, array);
  78. child.setProducer(ap);
  79. });
  80. source.subscribe(System.out::println);
  1. 除了 index 之外,我们还需要 array 来保存待发射的数组,我们无需 remaining 了,因为 index 最多递增到数组的长度。
  2. 结束运行的条件是局部变量 i 递增到 k(数组长度)。注意我们无需递减 k
  3. 在快路径中,在 i 递增到数组长度之前我们都进行循环。
  4. 在慢路径中,每次递增 i 之后,我们立即检查是否已经抵达了数组的末尾,如果抵达末尾就发出 onCompleted()。注意,慢路径中不支持空数组。

总结

在本文中,我展示了如何为简单如 RangeProducer 的 producer 增加一个快路径,并且如何把它转变为一个支持基本类型数组的 producer,避免额外的 Iterator 分配和遍历开销。

到目前为止,我介绍了众多的 producer,包括确切知道应该发射多少数据的 producer,以及不知道或者不关心发射量的 producer。然而,存在一些需要处理来自多种 producer 的多个数据源的操作符,它们还需要处理得 child 只需要处理一种数据源。在下一篇关于 producer 的文章中,我将介绍一种我称之为 producer-arbiter 的 producer,它能在保证 backpressure 的前提下支持不同 producer 之间进行切换。