DispatcherSupplier

你可能注意到了一些Dispatcher是单线程的,特别是RingBufferDispatcher和MpscDispatcher。深入来说,根据Reactive Stream的说明,Subscriber和Processor的实现是不允许并发通知的。这一点对Reactor Stream有极其特殊的影响,试图用Stream.dispatchOn(Dispatcher)与Dispatcher来关闭可使用并发的大门。

然而还有一种方式可以解决这种限制,使用Dispatcher pool或者DispatcherSupplier。实际上作为一个Supplier工厂,Supplier.get()方法根据有趣的pooling(XX:池/共享)策略:轮询、最少使用。。等间接提供一个Dispatcher。

Environment提供一个静态帮助类来创建并注册到当前活跃的Environment和Dispatcher pool:一组轮询返回的Dispatcher集合。一旦就绪,supplier就会提供一组可控数量的Dispatcher集合。

不用与Dispatcher集合,Environment提供一站式的管理服务:

  1. Environment.initialize();
  2. //....
  3. //Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...)
  4. DispatcherSupplier supplier = Environment.newCachedDispatchers(2);
  5. Dispatcher d1 = supplier.get();
  6. Dispatcher d2 = supplier.get();
  7. Dispatcher d3 = supplier.get();
  8. Dispatcher d4 = supplier.get();
  9. Assert.isTrue( d1 == d3 && d2 == d4);
  10. supplier.shutdown();
  11. //Create and register a new pool of 3 dispatchers
  12. DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool");
  13. DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");
  14. Assert.isTrue( supplier1 == supplier2 );
  15. supplier1.shutdown();