上一篇文章我们分析了 soul-admin 使用 http 数据同步方式在项目启动的时候会先刷新内存里面的数据,同时还会开启一个定时器去刷新内存数据,而这些数据全是从数据库里面取出来的。

然后我们留下了一个问题:通过 soul-admin 的可视化界面修改了数据之后,会不会及时的更新到内存里?

今天我们就看看在 soul-admin 里面是怎么去实现的。

修改数据之后的数据同步

我们知道 soul-admin 的可视化操作界面是用接口去处理逻辑的,也就是说只需要找到接口之后一步步去跟踪就知道数据是怎么更新到内存的了。

我们就以「选择器」数据为例。

  1. @RequestMapping("/selector")
  2. public class SelectorController {
  3. @PutMapping("/{id}")
  4. public SoulAdminResult updateSelector(@PathVariable("id") final String id, @RequestBody final SelectorDTO selectorDTO) {
  5. Objects.requireNonNull(selectorDTO);
  6. selectorDTO.setId(id);
  7. Integer updateCount = selectorService.createOrUpdate(selectorDTO);
  8. return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);
  9. }
  10. }

可以看到这个修改「选择器」的接口就是一个简单的调用 service 方法。那么我们就继续去跟踪这个 createOrUpdate 方法。

  1. @Service("selectorService")
  2. public class SelectorServiceImpl implements SelectorService {
  3. @Transactional(rollbackFor = RuntimeException.class)
  4. public int createOrUpdate(final SelectorDTO selectorDTO) {
  5. int selectorCount;
  6. SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
  7. List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
  8. if (StringUtils.isEmpty(selectorDTO.getId())) {
  9. selectorCount = selectorMapper.insertSelective(selectorDO);
  10. selectorConditionDTOs.forEach(selectorConditionDTO -> {
  11. selectorConditionDTO.setSelectorId(selectorDO.getId());
  12. selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
  13. });
  14. } else {
  15. selectorCount = selectorMapper.updateSelective(selectorDO);
  16. //delete rule condition then add
  17. selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
  18. selectorConditionDTOs.forEach(selectorConditionDTO -> {
  19. selectorConditionDTO.setSelectorId(selectorDO.getId());
  20. SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
  21. selectorConditionMapper.insertSelective(selectorConditionDO);
  22. });
  23. }
  24. publishEvent(selectorDO, selectorConditionDTOs);
  25. return selectorCount;
  26. }
  27. }

这个方法的代码看起来挺多的,但是我们稍微区分下就知道主要有 3 块内容:

  1. 构造「选择器」的实体
  2. 区分新增还是修改
  3. 发布数据更新事件

前两个就不用多解释了,一个是作为更新数据的参数,另一个是更新到数据库。我们重点看下第三个,发布数据更新事件。

  1. public class SelectorServiceImpl implements SelectorService {
  2. private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
  3. PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
  4. List<ConditionData> conditionDataList =
  5. selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
  6. // publish change event.
  7. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
  8. Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
  9. }
  10. }

从代码中可以看到核心的内容就一点,使用了 Spring 的事件发布订阅模式。也就是说,每次更新「选择器」的数据,都会触发两个操作:分别是更新数据库和发布数据更新事件。

找到了事件的发布,我们继续去看事件的监听,也就是对事件的处理,它在 DataChangedEventDispatcher 这个类里面。

  1. @Component
  2. public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  3. private ApplicationContext applicationContext;
  4. private List<DataChangedListener> listeners;
  5. public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
  6. this.applicationContext = applicationContext;
  7. }
  8. @Override
  9. @SuppressWarnings("unchecked")
  10. public void onApplicationEvent(final DataChangedEvent event) {
  11. for (DataChangedListener listener : listeners) {
  12. switch (event.getGroupKey()) {
  13. case APP_AUTH:
  14. listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
  15. break;
  16. case PLUGIN:
  17. listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
  18. break;
  19. case RULE:
  20. listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
  21. break;
  22. case SELECTOR:
  23. listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
  24. break;
  25. case META_DATA:
  26. listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
  27. break;
  28. default:
  29. throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
  30. }
  31. }
  32. }
  33. @Override
  34. public void afterPropertiesSet() {
  35. Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
  36. this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
  37. }
  38. }

这个类也很简单,它实现了 Spring 的事件监听接口和 InitializingBean 接口,前者属于 Spring 事件发布订阅模块,简单说就是需要实现这个接口才可以使用自己的事件发布订阅;后者在上一篇文章里已经分析过了,主要就是提供了 Bean 在属性初始化之后的方法,这里的目的是加载并且实例化我们自定义的监听器。

这里多提一下,这些监听器具体是什么呢?其实就是分别针对「插件」「选择」「规则」等这些数据改变时的特殊处理。因为每种数据的处理方式会不一样,所以使用了事件发布订阅模式,交给自己的事件监听器去处理。

我们继续回到上面「选择器」事件发布的处理上面来,还是上面的代码。可以看到这里遍历了我们所有自定义的事件监听器,然后根据事件的 groupKey 去选择对应的监听器处理。

我们还是只关注 http 数据同步这块,忽略 Nacos 和 Zookeeper 的事件监听器。

  1. public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
  2. @Override
  3. public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
  4. if (CollectionUtils.isEmpty(changed)) {
  5. return;
  6. }
  7. this.updateSelectorCache();
  8. this.afterSelectorChanged(changed, eventType);
  9. }
  10. }

这个 AbstractDataChangedListener 类已经是老朋友了,昨天就分析过它会在项目启动时就先从数据库同步数据到内存里,今天又遇到它了。不过这一次它是在修改了「选择器」数据之后触发。

可以看到主要内就两行代码,一是更新「选择器」数据,这一点跟上篇文章分析的一样;二是执行了 afterSelectorChanged 方法,那么这个方法又干了什么呢?

  1. public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  2. @Override
  3. protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
  4. scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
  5. }
  6. }

又是一个老朋友,这个类在上一篇文章中就是开启了一个定时同步数据的定时器。

这一次内容也差不多,使用了线程池去开启了一个 DataChangeTask 线程。那么这个类干了啥呢?

  1. class DataChangeTask implements Runnable {
  2. private final ConfigGroupEnum groupKey;
  3. private final long changeTime = System.currentTimeMillis();
  4. DataChangeTask(final ConfigGroupEnum groupKey) {
  5. this.groupKey = groupKey;
  6. }
  7. @Override
  8. public void run() {
  9. for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
  10. LongPollingClient client = iter.next();
  11. iter.remove();
  12. client.sendResponse(Collections.singletonList(groupKey));
  13. log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
  14. }
  15. }
  16. }

可以看到这个类就是一个 Runnable 的实现类,主要的作用就是从 LongPollingClient 集合里面取出一个用异步的方式返回给请求端改变的 groupKey。

记录一个疑问

这里记录一个问题,我在调试这块代码的时候发现每次修改数据之后, LongPollingClient 集合的数据都是空,也就是说这部分代码并不会被执行,感觉有点多余。

我感觉这里的代码应该跟 http 数据同步到 soul 网关那块逻辑有关系,暂时还不涉及到这一块,所以就先跳过。

总结

我们今天通过修改「选择器」数据结合代码去分析了 soul-admin 是如何利用 Spring 的事件发布订阅模式去更新内存的。

但是到目前为止,我们都只是了解了 soul-admin 模块本身如何保证数据库的数据和内存数据一致的,并没有涉及到数据同步到 soul 网关。

下一篇文章里,我们继续探究 soul-admin 和 soul 网关之间的 http 数据同步。