理解线程模型
Reactive Streams
和Reactive
扩展的一个共同目标是通过信号回调这种方式不再武断的遵循线程习惯。它会在现在和某个时刻T
执行是Streams
的所有。非同步信号也可以保存Subscriber
的并发访问(无共享),但是信号和请求可以在两个不对称的线程上执行。
默认情况下,Stream
被分配了一个SynchronousDispatcher
,并且将会通过Stream.getDispatcher()
来通知它直接子级。
多种多样的
Stream
工厂,Broadcaster
,Stream.dispatchOn
和终端的xxxOn
方法可能会修改默认的SynchronousDispatcher
Reactor Stream中三大主要可用线程切换的基本了解:
Stream.dispatchOn
的作用是Stream下唯一可用的方法用于在给定的dispatcher上分发onError
,onComplete
和onNext
信号。- Processor的行为不支持并发分发,例如
WorkQueueDispatcher
。 - request和cancel将会在dispatcher上执行,如果它的上下文准备完毕。否则它将会在当前dispatch执行完毕后执行。
- Processor的行为不支持并发分发,例如
Stream.subscribeOn
将只会在已经通过的dispatcher上执行。- 由于唯一一次通过的Dispatcher被称之为
onSubscribe
,任何dispatcher可以使用并发分发器,类似WorkQueueDispatcher
。 - 第一次请求可能仍然在
onSubscribe
线程中执行,如Stream.consume()
操作的实例。
- 由于唯一一次通过的Dispatcher被称之为
Stream.process
附属的Processor
实例也会影响线程。类似RingBufferProcessor
的Processor
将会使用它自己管理的线程来执行Subscriber
。- 如果上下文准备完毕,请求和取消将会在同一个processor上执行。
RingBufferWorkProcessor
仅最多分发onNext
信号到一个Subscriber
上,除非它在执行过程中被中断(重播给一个新的Subscriber
)。
常规订阅是由onSubscribe
开始请求数据,subscribeOn
是一种有效的工具来放大Stream
,尤其是无界的。如果一个订阅者向onSubscribe
请求Long.MAX_VALUE
条数据,它将会成为唯一一个执行请求者,并且它会在分配给这个订阅者的diapatcher
上运行。这是无界Stream
的默认消费行为。
在无限请求的线程中跳转
Streams
.range(1, 100)
.dispatchOn(Environment.sharedDispatcher()) (2)
.subscribeOn(Environment.workDispatcher()) (1)
.consume(); (3)
- 分配一个
onSubscribe
的工作队列dispatcher
。 - 分配
onNext
,onError
,onComplete
信号的dispatcher
。 - 使用
Subscription.request(Long.MAX)
来消费Stream
onSubscribe
Figure 12. 无界消费者的subscribeOn和dispatchOn/process
然而,当多于1个请求将会变的复杂时,subscribeOn
会变的比较无用,例如在限定Stream.capacity(n)
的分步消费中。唯一执行请求的线程可能运行在分配给subscribeOn
的第一个dispatcher
上。
在有限请求的线程中跳转
Streams
.range(1, 100)
.process(RingBufferProcessor.create()) (2)
.subscribeOn(Environment.workDispatcher()) (1)
.capacity(1); (3)
.consume(); (4)
- 分配一个
onSubscribe
的工作队列dispatcher
。请注意它被放置在subscribeOn将运行在用户的ringBuffer线程后,我们希望把它改成工作调度(dispatcher)。 - 分配一个用于处理
onNext
,onError
,onComplete
的异步Processor
。类似dispatchOn
的行为。 - 分配
Stream
的容量为1,以便下游做适应。 - 通过
Subscription.request(1)
在每次onNext
时消费数据。
Figure 13. 有界消费者(N < Long.MAX)的subscribeOn和dispatchOn/process