reactor-core模块

You should never do your asynchronous work alone.

— Jon Brisbin

完成Reactor 1后写到

You should never do your asynchronous work alone.

— Stephane Maldini

完成Reactor 2后写到

首先,我们来用一个Groovy的示例来展示Core模块的功能:

  1. //初始化上下文,获取默认dispather
  2. Environment.initialize()
  3. //使用默认8192 slots的RingBufferDispatcher
  4. def dispatcher = Environment.sharedDispatcher()
  5. //创建一个回调
  6. Consumer<Integer> c = { data ->
  7. println "some data arrived: $data"
  8. }
  9. //创建一个异常回调
  10. Consumer<Throwable> errorHandler = { it.printStackTrace }
  11. //异步调度数据
  12. dispatcher.dispatch(1234, c, errorHandler)
  13. Environment.terminate()

然后,用Reactive Streams的方式来实现

  1. //独立的异步processor
  2. def processor = RingBufferProcessor.<Integer>create()
  3. //发送数据,将会保持数据安全直到subscriber连接到processor为止
  4. processor.onNext(1234)
  5. processor.onNext(5678)
  6. //消费整型数据
  7. processor.subscribe(new Subscriber<Integer>(){
  8. void onSubscribe(Subscription s){
  9. //无界限的subscriber
  10. s.request Long.MAX
  11. }
  12. void onNext(Integer data){
  13. println data
  14. }
  15. void onError(Throwable err){
  16. err.printStackTrace()
  17. }
  18. void onComplete(){
  19. println 'done!'
  20. }
  21. }
  22. //关闭内部线程,调用complete函数
  23. processor.onComplete()