这是网关在路由匹配的过程中出现的一小段的代码,刚接触起来还是比较难懂的,:(

核心在于理解

  • Flux和Mono的订阅特性,如果没有订阅,那么内部的这段代码是不运行的. 他的花销仅仅在于实例化整个动作
  • Flux和Mono的每次订阅 subscribe 都会触发它们提前设定的内容

    不恰当的比喻

    Mono: 对比成一个普通的对象,例如 UserPerson 实例对象
    Flux: 对比成一个List,一个流水线,是可以中断的。

每次路由都会进入这段代码进行路由(找到路由)

  1. public class Test1 {
  2. final static Logger log = LoggerFactory.getLogger(Test1.class);
  3. private static final String CACHE_KEY = "routes";
  4. private static Flux<String> fetch() {
  5. System.out.println("111111");
  6. return Flux.fromIterable(Arrays.asList("11", "22", "33", "44"));
  7. }
  8. private static final Map<String, List> cache = new ConcurrentHashMap<>();
  9. Flux<String> routes = lookup(cache, CACHE_KEY, String.class).onCacheMissResume(Test1::fetch);
  10. public static <KEY, VALUE> CacheFlux.FluxCacheBuilderMapMiss<VALUE> lookup(
  11. Map<KEY, ? super List> cacheMap, KEY key, Class<VALUE> valueClass) {
  12. return new CacheFlux.FluxCacheBuilderMapMiss<VALUE>() {
  13. @Override
  14. public Flux onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier) {
  15. return Flux.defer(() -> {
  16. Object fromCache = cacheMap.get(key);
  17. if (fromCache == null) {
  18. return otherSupplier.get()
  19. .materialize()
  20. .collectList()
  21. .doOnNext(signals -> cacheMap.put(key, signals))
  22. .flatMapIterable(Function.identity())
  23. .dematerialize();
  24. } else if (fromCache instanceof List) {
  25. try {
  26. @SuppressWarnings("unchecked")
  27. List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
  28. return Flux.fromIterable(fromCacheSignals)
  29. .dematerialize();
  30. } catch (Throwable cause) {
  31. return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " cannot be cast to List<Signal>", cause));
  32. }
  33. } else {
  34. return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " is not a List"));
  35. }
  36. });
  37. }
  38. };
  39. }
  40. @Test
  41. public void main() {
  42. Flux.fromIterable(Arrays.asList("55", "66", "77", "88"))
  43. .materialize()
  44. .collectList()
  45. .doOnNext(x -> cache.put(CACHE_KEY, x)).subscribe();
  46. for (int i = 0; i < 10; i++) {
  47. int finalI = i;
  48. //每次都会进入onCacheMissResume
  49. final Mono<String> map = routes
  50. .concatMap(x -> Mono.just(x).filterWhen(r -> {
  51. return Mono.just(r.equals(finalI + "" + finalI));
  52. })
  53. .doOnError(e -> log.error("ff"))
  54. .onErrorResume(e -> Mono.empty()))
  55. .next()
  56. .map(route -> {
  57. return route;
  58. });
  59. map.subscribe(System.out::println);
  60. }
  61. }
  62. }

引申

  • 如何实现动态路由呢?
    • 外接注册中心,例如Nacos
      • 2020年的时候我对SpringCloud整体还不够熟悉,实际上springCloud Gateway已经实现了,详见 ReactiveCompositeDiscoveryClientAutoConfiguration.java
  • 如何快速的实现路由过滤呢
    • 网关本身只是作为 源服务 的一个代理,那么本身的消耗应该是微乎其微的.
    • SpringCloud Gateway是通过List遍历的形式实现的,利用是Java8的 Predicate 函数。

      参考链接