DispatcherSupplier
你可能注意到了一些Dispatcher是单线程的,特别是RingBufferDispatcher和MpscDispatcher。深入来说,根据Reactive Stream的说明,Subscriber和Processor的实现是不允许并发通知的。这一点对Reactor Stream有极其特殊的影响,试图用Stream.dispatchOn(Dispatcher)与Dispatcher来关闭可使用并发的大门。
然而还有一种方式可以解决这种限制,使用Dispatcher pool或者DispatcherSupplier。实际上作为一个Supplier工厂,Supplier.get()方法根据有趣的pooling(XX:池/共享)策略:轮询、最少使用。。等间接提供一个Dispatcher。
Environment提供一个静态帮助类来创建并注册到当前活跃的Environment和Dispatcher pool:一组轮询返回的Dispatcher集合。一旦就绪,supplier就会提供一组可控数量的Dispatcher集合。
不用与Dispatcher集合,Environment提供一站式的管理服务:
Environment.initialize();//....//Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...)DispatcherSupplier supplier = Environment.newCachedDispatchers(2);Dispatcher d1 = supplier.get();Dispatcher d2 = supplier.get();Dispatcher d3 = supplier.get();Dispatcher d4 = supplier.get();Assert.isTrue( d1 == d3 && d2 == d4);supplier.shutdown();//Create and register a new pool of 3 dispatchersDispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool");DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");Assert.isTrue( supplier1 == supplier2 );supplier1.shutdown();
