reactor-stream模块
不,你永远也不要用
Future.get()。— Stephane Maldini
银行业的客户
首先,我们来看看在Java 8中一些Stream工作的例子:
import static reactor.Environment.*;import reactor.rx.Streams;import reactor.rx.BiStreams;//...Environment.initialize()//find the top 10 words used in a list of StringsStreams.from(aListOfString).dispatchOn(sharedDispatcher()).flatMap(sentence ->Streams.from(sentence.split(" ")).dispatchOn(cachedDispatcher()).filter(word -> !word.trim().isEmpty()).observe(word -> doSomething(word))).map(word -> Tuple.of(word, 1)).window(1, TimeUnit.SECONDS).flatMap(words ->BiStreams.reduceByKey(words, (prev, next) -> prev + next).sort((wordWithCountA, wordWithCountB) -> -wordWithCountA.t2.compareTo(wordWithCountB.t2)).take(10).finallyDo(event -> LOG.info("---- window complete! ----"))).consume(wordWithCount -> LOG.info(wordWithCount.t1 + ": " + wordWithCount.t2),error -> LOG.error("", error));
