理解线程模型

Reactive StreamsReactive扩展的一个共同目标是通过信号回调这种方式不再武断的遵循线程习惯。它会在现在和某个时刻T执行是Streams的所有。非同步信号也可以保存Subscriber的并发访问(无共享),但是信号和请求可以在两个不对称的线程上执行。

默认情况下,Stream被分配了一个SynchronousDispatcher,并且将会通过Stream.getDispatcher()来通知它直接子级。

多种多样的Stream工厂,BroadcasterStream.dispatchOn和终端的xxxOn方法可能会修改默认的SynchronousDispatcher

Reactor Stream中三大主要可用线程切换的基本了解:

  • Stream.dispatchOn的作用是Stream下唯一可用的方法用于在给定的dispatcher上分发onErroronCompleteonNext信号。
    • Processor的行为不支持并发分发,例如WorkQueueDispatcher
    • request和cancel将会在dispatcher上执行,如果它的上下文准备完毕。否则它将会在当前dispatch执行完毕后执行。
  • Stream.subscribeOn将只会在已经通过的dispatcher上执行。
    • 由于唯一一次通过的Dispatcher被称之为onSubscribe,任何dispatcher可以使用并发分发器,类似WorkQueueDispatcher
    • 第一次请求可能仍然在onSubscribe线程中执行,如Stream.consume()操作的实例。
  • Stream.process附属的Processor实例也会影响线程。类似RingBufferProcessorProcessor将会使用它自己管理的线程来执行Subscriber
    • 如果上下文准备完毕,请求和取消将会在同一个processor上执行。
    • RingBufferWorkProcessor仅最多分发onNext信号到一个Subscriber上,除非它在执行过程中被中断(重播给一个新的Subscriber)。

常规订阅是由onSubscribe开始请求数据,subscribeOn是一种有效的工具来放大Stream,尤其是无界的。如果一个订阅者向onSubscribe请求Long.MAX_VALUE条数据,它将会成为唯一一个执行请求者,并且它会在分配给这个订阅者的diapatcher上运行。这是无界Stream的默认消费行为。

在无限请求的线程中跳转

  1. Streams
  2. .range(1, 100)
  3. .dispatchOn(Environment.sharedDispatcher()) (2)
  4. .subscribeOn(Environment.workDispatcher()) (1)
  5. .consume(); (3)
  1. 分配一个onSubscribe的工作队列dispatcher
  2. 分配onNextonErroronComplete信号的dispatcher
  3. 使用Subscription.request(Long.MAX)来消费Stream onSubscribe

Figure 12. 无界消费者的subscribeOn和dispatchOn/process Figure 12. 无界消费者的subscribeOn和dispatchOn/process

然而,当多于1个请求将会变的复杂时,subscribeOn会变的比较无用,例如在限定Stream.capacity(n)的分步消费中。唯一执行请求的线程可能运行在分配给subscribeOn的第一个dispatcher上。

在有限请求的线程中跳转

  1. Streams
  2. .range(1, 100)
  3. .process(RingBufferProcessor.create()) (2)
  4. .subscribeOn(Environment.workDispatcher()) (1)
  5. .capacity(1); (3)
  6. .consume(); (4)
  1. 分配一个onSubscribe的工作队列dispatcher。请注意它被放置在subscribeOn将运行在用户的ringBuffer线程后,我们希望把它改成工作调度(dispatcher)。
  2. 分配一个用于处理onNextonErroronComplete的异步Processor。类似dispatchOn的行为。
  3. 分配Stream的容量为1,以便下游做适应。
  4. 通过Subscription.request(1)在每次onNext时消费数据。

Figure 13. 有界消费者(N < Long.MAX)的subscribeOn和dispatchOn/process) Figure 13. 有界消费者(N < Long.MAX)的subscribeOn和dispatchOn/process