在上一篇文章中,我们已经知道了 soul-admin 会启动 web socket 服务,当 soul-bootstrap 网关启动就会跟 soul-admin 的 web socket 服务建立连接。

再加上我们自己的接口服务会自动注册到 soul-admin,然后被推送到 soul-bootsrap 网关。经过这样的过程之后,我们就可以使用 soul-bootstrap 来代理我们原本的 http 接口。

今天这篇文章我们就来从代码层面去看看整个 http 接口的数据的流转过程。

1 注册到 soul-admin

接口数据的源头始终是我们自己新建的 http 服务,通过引入的 starter 依赖借用 @SoulSpringMvcClient 注解扫描到需要代理的接口,然后在 Spring Bean 初始化完成之后去调用 http 工具类以 post 方式提交给 soul-admin。

这一块逻辑在前面的文章已经梳理过了,这里不再重复。

那么要如何确定到代码呢,就需要回看之前分析 SpringMvcClientBeanPostProcessor 类的文章了。这里面明确的指定了 http post 提交的接口地址。

  1. soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";

下面就是我们找到的 soul-admin 涉及到这部分的代码,同样只关注 http 的注册方式。

  1. @RestController
  2. @RequestMapping("/soul-client")
  3. public class SoulClientController {
  4. private final SoulClientRegisterService soulClientRegisterService;
  5. /**
  6. * Instantiates a new Soul client controller.
  7. *
  8. * @param soulClientRegisterService the soul client register service
  9. */
  10. public SoulClientController(final SoulClientRegisterService soulClientRegisterService) {
  11. this.soulClientRegisterService = soulClientRegisterService;
  12. }
  13. /**
  14. * Register spring mvc string.
  15. *
  16. * @param springMvcRegisterDTO the spring mvc register dto
  17. * @return the string
  18. */
  19. @PostMapping("/springmvc-register")
  20. public String registerSpringMvc(@RequestBody final SpringMvcRegisterDTO springMvcRegisterDTO) {
  21. return soulClientRegisterService.registerSpringMvc(springMvcRegisterDTO);
  22. }
  23. }

可以看到这个 Controller 类就是一个简单的调用 service 方法提供 api 接口。

可以猜猜它会干些啥?

  1. 我猜测它会在 service 去处理传递过来的接口参数,然后把数据存起来。因为 soul-admin 的前端页面是可以访问到这些数据的。
  2. 数据到了 soul-admin,整个过程还只完成了一半,这些接口数据最终肯定会到 soul-bootstrap 的,结合前面梳理的逻辑,猜测应该会借助 web socket 推送给 soul-bootstrap。

接下来我们继续看代码,验证我们的猜想。

直接到 SoulClientRegisterServiceImpl 实现类,我也移除了多余的代码块,加上了注释。

  1. public class SoulClientRegisterServiceImpl implements SoulClientRegisterService {
  2. /**
  3. * 这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想
  4. */
  5. @Transactional
  6. public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
  7. // 这个是注册元数据,暂时忽略
  8. if (dto.isRegisterMetaData()) {
  9. MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
  10. if (Objects.isNull(exist)) {
  11. saveSpringMvcMetaData(dto);
  12. }
  13. }
  14. // 注册选择器并且拿到选择器 ID
  15. String selectorId = handlerSpringMvcSelector(dto);
  16. // 注册规则,同时规则需要跟选择器关联
  17. handlerSpringMvcRule(selectorId, dto);
  18. return SoulResultMessage.SUCCESS;
  19. }
  20. }

从上面的代码我们可以看到这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想——数据会被存储到数据库。

同时,数据库事务一般都会执行至少两条 SQL 的操作,可以看到上面的代码分别处理了「选择器」和「规则」两个方法。并且因为「规则」的处理还需要传递「选择器」ID,可以猜想「选择器」和「规则」是一对多的关系,实际上也的确是的,这可以从实体类或者数据库字段看出来。

接下来继续深入代码,可以发现不管是处理「选择器」还是「规则」都有一个 DataChangedEvent 事件发布,那就很可能是给 soul-bootstrap 推送数据了吧。

  1. // publish change event.
  2. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
  3. Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
  4. // publish change event.
  5. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
  6. Collections.singletonList(selectorData)));

有事件发布,肯定会有事件监听,接下来找到 DataChangedEvent 事件的监听代码。

  1. @Component
  2. public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  3. public void onApplicationEvent(final DataChangedEvent event) {
  4. for (DataChangedListener listener : listeners) {
  5. switch (event.getGroupKey()) {
  6. case APP_AUTH:
  7. listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
  8. break;
  9. case PLUGIN:
  10. listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
  11. break;
  12. case RULE:
  13. listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
  14. break;
  15. case SELECTOR:
  16. listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
  17. break;
  18. case META_DATA:
  19. listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
  20. break;
  21. default:
  22. throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
  23. }
  24. }
  25. }
  26. }

可以看到这个监听器会处理很多种不同的逻辑,我们只关注 web socket 下面的「选择器」和「规则」的处理。

  1. public class WebsocketDataChangedListener implements DataChangedListener {
  2. @Override
  3. public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
  4. WebsocketData<SelectorData> websocketData =
  5. new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
  6. WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
  7. }
  8. @Override
  9. public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
  10. WebsocketData<RuleData> configData =
  11. new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
  12. WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
  13. }
  14. }

可以看到这里拿到了变动的「选择器」列表数据和「规则」列表数据,然后调用了 WebsocketController 类里面的 send 方法,那么发送到哪里去呢?很可能就是 soul-bootstrap 了吧。

  1. public class WebsocketCollector {
  2. /**
  3. * Send.
  4. *
  5. * @param message the message
  6. * @param type the type
  7. */
  8. public static void send(final String message, final DataEventTypeEnum type) {
  9. if (StringUtils.isNotBlank(message)) {
  10. if (DataEventTypeEnum.MYSELF == type) {
  11. try {
  12. session.getBasicRemote().sendText(message);
  13. } catch (IOException e) {
  14. log.error("websocket send result is exception: ", e);
  15. }
  16. return;
  17. }
  18. for (Session session : SESSION_SET) {
  19. try {
  20. session.getBasicRemote().sendText(message);
  21. } catch (IOException e) {
  22. log.error("websocket send result is exception: ", e);
  23. }
  24. }
  25. }
  26. }
  27. }

虽然直接从代码里看不出来发送到了 soul-bootstrap,但是可以明确一点——这些数据将会发送给连接上了 soul-admin 模块的 web socket 服务,也就是说只要证明 soul-bootstrap 连上了 soul-admin 的 web socket 服务就可以证明数据将会从 soul-admin 到 soul-bootstrap 了。

然而上篇文章我们已经知道 soul-bootstrap 的配置文件的确配置了 soul-admin 的 web socket 服务了,但是它是在哪里连接上的呢?

2 soul-bootstrap 怎么连接 web socket 服务

答案就是数据同步模块,还是老规矩只关注 web socket 方式的数据同步,在 soul-bootstrap 的 pom.xml 文件里依赖了 soul-spring-boot-starter-sync-data-websocket 模块,这个就是 web socket 同步数据用的。

  1. <dependency>
  2. <groupId>org.dromara</groupId>
  3. <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
  4. <version>${project.version}</version>
  5. </dependency>

继续寻找配置类,因为配置类会读取 soul-bootstrap 配置的 web socket 服务地址,拿到地址之后肯定会去建立连接对吧。

  1. @Configuration
  2. @ConditionalOnClass(WebsocketSyncDataService.class)
  3. @ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
  4. @Slf4j
  5. public class WebsocketSyncDataConfiguration {
  6. @Bean
  7. public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
  8. final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
  9. log.info("you use websocket sync soul data.......");
  10. return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
  11. metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
  12. }
  13. }
  14. // 创建 web socket 连接
  15. public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
  16. /**
  17. * Instantiates a new Websocket sync cache.
  18. *
  19. * @param websocketConfig the websocket config
  20. * @param pluginDataSubscriber the plugin data subscriber
  21. * @param metaDataSubscribers the meta data subscribers
  22. * @param authDataSubscribers the auth data subscribers
  23. */
  24. public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
  25. final PluginDataSubscriber pluginDataSubscriber,
  26. final List<MetaDataSubscriber> metaDataSubscribers,
  27. final List<AuthDataSubscriber> authDataSubscribers) {
  28. String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
  29. executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
  30. for (String url : urls) {
  31. try {
  32. clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
  33. } catch (URISyntaxException e) {
  34. log.error("websocket url({}) is error", url, e);
  35. }
  36. }
  37. try {
  38. for (WebSocketClient client : clients) {
  39. boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
  40. if (success) {
  41. log.info("websocket connection is successful.....");
  42. } else {
  43. log.error("websocket connection is error.....");
  44. }
  45. executor.scheduleAtFixedRate(() -> {
  46. try {
  47. if (client.isClosed()) {
  48. boolean reconnectSuccess = client.reconnectBlocking();
  49. if (reconnectSuccess) {
  50. log.info("websocket reconnect is successful.....");
  51. } else {
  52. log.error("websocket reconnection is error.....");
  53. }
  54. }
  55. } catch (InterruptedException e) {
  56. log.error("websocket connect is error :{}", e.getMessage());
  57. }
  58. }, 10, 30, TimeUnit.SECONDS);
  59. }
  60. /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
  61. } catch (InterruptedException e) {
  62. log.info("websocket connection...exception....", e);
  63. }
  64. }
  65. }

尽管看起来代码量挺多,但是做的事情却很少,无非就是从配置类里面拿到 web socket 地址,然后为每一个地址创建一个 socket 连接,之后用线程池去处理每个连接之后的事情。

不过这里对似乎看不出如何对传入的数据进行处理的过程,而是使用了观察者模式去完成数据的订阅处理,如果你跟我一样好奇这里的流程,我们下篇文章再一起深入吧。

总结

总之,到了这里已经知道了 soul-bootstrap 会主动跟 soul-admin 建立 web socket 连接,soul-admin 也会给 soul-bootstrap 推送接口变更数据。

也通过源码验证了文章开头提出的两个问题。

但是又留下了一个新问题:soul-bootstrap 是怎么处理这些接口数据的。

我们不妨再发散下思维,假如 soul-bootstrap 挂掉了重启,接口数据会不会消失?如果消失了要怎么办?

这些问题我们就下次去解决吧。