引言

本篇开始研究 Soul 网关 http 数据同步,将分为三篇进行分析:

  • 《Admin通知前处理》
  • 《变更通知机制》
  • 《Bootstrap处理变更通知》

希望三篇完结后能对 Soul 的 http 数据同步策略有所收获。

本篇旨在探究 soul-admin 端在发起变更通知前所做的处理。
不同数据变更的处理模式应当是一致的,故本篇以 selector 配置变更为切入点进行深入。

一、配置变更入口

找到 SelectorController,这是 selector 配置变更的入口

【Soul网关探秘】http数据同步-Admin通知前处理 - 图1

其持有一个 SelectorService 引用,通过 SelectorService 实现 selector 配置变更。

二、配置变更服务

再来看看 SelectorService,实现了配置变更的具体处理。

【Soul网关探秘】http数据同步-Admin通知前处理 - 图2

其内部持有5个 mapper、1个 eventPublisher和1个 upstreamCheckService,对外提供一系列对 selector 的crud方法

注意 createOrUpdate 方法

  1. public int createOrUpdate(final SelectorDTO selectorDTO) {
  2. int selectorCount;
  3. SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
  4. List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
  5. // 数据落库
  6. if (StringUtils.isEmpty(selectorDTO.getId())) {
  7. selectorCount = selectorMapper.insertSelective(selectorDO);
  8. selectorConditionDTOs.forEach(selectorConditionDTO -> {
  9. selectorConditionDTO.setSelectorId(selectorDO.getId());
  10. selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
  11. });
  12. } else {
  13. selectorCount = selectorMapper.updateSelective(selectorDO);
  14. //delete rule condition then add
  15. selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
  16. selectorConditionDTOs.forEach(selectorConditionDTO -> {
  17. selectorConditionDTO.setSelectorId(selectorDO.getId());
  18. SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
  19. selectorConditionMapper.insertSelective(selectorConditionDO);
  20. });
  21. }
  22. // 发布 spring 事件
  23. publishEvent(selectorDO, selectorConditionDTOs);
  24. // 更新 divide 上游服务
  25. updateDivideUpstream(selectorDO);
  26. return selectorCount;
  27. }

处理策略是先落库,再发布 spring 事件,最后更新 divide 上游服务

三、spring 事件通知机制

此处涉及 spring 的事件通知机制,在此简要说明:

ApplicationContext通过ApplicationEvent类和ApplicationListener接口提供事件处理。

如果一个bean实现ApplicationListener接口在容器中,每次一个ApplicationEvent被发布到ApplicationContext中,这类bean就会收到这些通知。

实现Spring事件机制主要有4个类:

  • ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。
  • ApplicationListener:事件监听器,用于接收事件处理时间。
  • ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。
  • ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。

四、soul 实现事件通知

下面我们看看 Soul 是如何使用 spring 的时间通知机制。

事件定义

【Soul网关探秘】http数据同步-Admin通知前处理 - 图3

DataChangedEvent 继承 ApplicationEvent,提供了 DataChangedEvent(groupKey, type, source) 事件构造方法

事件监听器

【Soul网关探秘】http数据同步-Admin通知前处理 - 图4

DataChangedEventDispatcher 实现了 ApplicationListener接口,借助 onApplicationEvent 方法监听事件

  1. public void onApplicationEvent(final DataChangedEvent event) {
  2. for (DataChangedListener listener : listeners) {
  3. switch (event.getGroupKey()) {
  4. case APP_AUTH:
  5. listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
  6. break;
  7. case PLUGIN:
  8. listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
  9. break;
  10. case RULE:
  11. listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
  12. break;
  13. case SELECTOR:
  14. listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
  15. break;
  16. case META_DATA:
  17. listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
  18. break;
  19. default:
  20. throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
  21. }
  22. }
  23. }

该方法内按事件类型分别处理,DataChangedEventDispatcher 同时实现了 InitializingBean 接口,在初始化后完成 listeners 的注入。

五、响应数据变更事件

上面的事件监听处理用到 soul 的 DataChangedListener 接口

【Soul网关探秘】http数据同步-Admin通知前处理 - 图5

DataChangedListener 实现了不同类型事件的事件响应方法用于响应 DataChangedEvent 事件。

1)AbstractDataChangedListener 的 onSelectorChanged 实现:

  1. public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
  2. if (CollectionUtils.isEmpty(changed)) {
  3. return;
  4. }
  5. // 更新 selector 缓存
  6. this.updateSelectorCache();
  7. // selector 变更后处理,实现具体的变更通知
  8. this.afterSelectorChanged(changed, eventType);
  9. }

可以看到 selector 变更处理是先更缓存后发通知。

2)AbstractDataChangedListener 的 updateSelectorCache 实现:

  1. protected void updateSelectorCache() {
  2. this.updateCache(ConfigGroupEnum.SELECTOR, selectorService.listAll());
  3. }

3)AbstractDataChangedListener 的 updateCache 实现:

  1. protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
  2. String json = GsonUtils.getInstance().toJson(data);
  3. ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
  4. ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
  5. log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
  6. }

可以看到最终是创建对应的 ConfigDataCache 存入 CACHE。

总结

本篇梳理了 soul-admin 在真正发出数据变更通知前的处理脉络,其策略是:先写库后更缓存,最后发出数据变更通知。
先写库保证数据不丢,另外在集群部署时,其他 soul-admin 节点也可通过浏览页面时查库保证数据一致。
意外学到 spring 的事件通知机制,soul 中的设计果真小巧精妙。

下篇,将探究 http 同步策略的变更通知机制,期待惊喜。