reactor-stream模块

不,你永远也不要用Future.get()

— Stephane Maldini

银行业的客户

首先,我们来看看在Java 8中一些Stream工作的例子:

  1. import static reactor.Environment.*;
  2. import reactor.rx.Streams;
  3. import reactor.rx.BiStreams;
  4. //...
  5. Environment.initialize()
  6. //find the top 10 words used in a list of Strings
  7. Streams.from(aListOfString)
  8. .dispatchOn(sharedDispatcher())
  9. .flatMap(sentence ->
  10. Streams
  11. .from(sentence.split(" "))
  12. .dispatchOn(cachedDispatcher())
  13. .filter(word -> !word.trim().isEmpty())
  14. .observe(word -> doSomething(word))
  15. )
  16. .map(word -> Tuple.of(word, 1))
  17. .window(1, TimeUnit.SECONDS)
  18. .flatMap(words ->
  19. BiStreams.reduceByKey(words, (prev, next) -> prev + next)
  20. .sort((wordWithCountA, wordWithCountB) -> -wordWithCountA.t2.compareTo(wordWithCountB.t2))
  21. .take(10)
  22. .finallyDo(event -> LOG.info("---- window complete! ----"))
  23. )
  24. .consume(
  25. wordWithCount -> LOG.info(wordWithCount.t1 + ": " + wordWithCount.t2),
  26. error -> LOG.error("", error)
  27. );