这是网关在路由匹配的过程中出现的一小段的代码,刚接触起来还是比较难懂的,:(
核心在于理解
- Flux和Mono的订阅特性,如果没有订阅,那么内部的这段代码是不运行的. 他的花销仅仅在于实例化整个动作
- Flux和Mono的每次订阅
subscribe
都会触发它们提前设定的内容不恰当的比喻
Mono: 对比成一个普通的对象,例如User
,Person
实例对象
Flux: 对比成一个List,一个流水线,是可以中断的。
每次路由都会进入这段代码进行路由(找到路由)
public class Test1 {
final static Logger log = LoggerFactory.getLogger(Test1.class);
private static final String CACHE_KEY = "routes";
private static Flux<String> fetch() {
System.out.println("111111");
return Flux.fromIterable(Arrays.asList("11", "22", "33", "44"));
}
private static final Map<String, List> cache = new ConcurrentHashMap<>();
Flux<String> routes = lookup(cache, CACHE_KEY, String.class).onCacheMissResume(Test1::fetch);
public static <KEY, VALUE> CacheFlux.FluxCacheBuilderMapMiss<VALUE> lookup(
Map<KEY, ? super List> cacheMap, KEY key, Class<VALUE> valueClass) {
return new CacheFlux.FluxCacheBuilderMapMiss<VALUE>() {
@Override
public Flux onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier) {
return Flux.defer(() -> {
Object fromCache = cacheMap.get(key);
if (fromCache == null) {
return otherSupplier.get()
.materialize()
.collectList()
.doOnNext(signals -> cacheMap.put(key, signals))
.flatMapIterable(Function.identity())
.dematerialize();
} else if (fromCache instanceof List) {
try {
@SuppressWarnings("unchecked")
List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
return Flux.fromIterable(fromCacheSignals)
.dematerialize();
} catch (Throwable cause) {
return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " cannot be cast to List<Signal>", cause));
}
} else {
return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " is not a List"));
}
});
}
};
}
@Test
public void main() {
Flux.fromIterable(Arrays.asList("55", "66", "77", "88"))
.materialize()
.collectList()
.doOnNext(x -> cache.put(CACHE_KEY, x)).subscribe();
for (int i = 0; i < 10; i++) {
int finalI = i;
//每次都会进入onCacheMissResume
final Mono<String> map = routes
.concatMap(x -> Mono.just(x).filterWhen(r -> {
return Mono.just(r.equals(finalI + "" + finalI));
})
.doOnError(e -> log.error("ff"))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
return route;
});
map.subscribe(System.out::println);
}
}
}
引申
- 如何实现动态路由呢?
- 外接注册中心,例如Nacos
- 2020年的时候我对SpringCloud整体还不够熟悉,实际上springCloud Gateway已经实现了,详见
ReactiveCompositeDiscoveryClientAutoConfiguration.java
- 2020年的时候我对SpringCloud整体还不够熟悉,实际上springCloud Gateway已经实现了,详见
- 外接注册中心,例如Nacos
- 如何快速的实现路由过滤呢