在上一篇文章中,我们已经知道了 soul-bootstrap 会主动跟 soul-admin 建立连接,而且 soul-admin 也会推送更新的接口数据给连接上 web socket 服务的客户端(也就是 soul 网关)。

但是我们还并不知道数据是怎样被 soul-bootstrap 处理的,所以留下了 2 个问题:

  1. soul-bootstrap 是怎么处理接口数据的
  2. soul-bootstrap 如果挂掉了重启,接口数据会不会消失?

今天我们一起就查看源码了解下这个数据处理的过程。

soul-bootstrap 建立 socket 连接的数据请求

我们先从第二个问题出发,因为挂掉重启和第一次启动的处理过程应该是一致的。

因为第一次启动是肯定不会有接口数据的,但是我们之前的测试的时候发现只要 soul-bootstrap 启动了就可以代理接口提供服务了。

也就是说,soul-bootstrap 启动并建立了 socket 连接之后,就应该会从 soul-admin 那里去取一次数据。

那么切入点也就找到了,当 socket 连接成功之后,肯定有我们想要的答案。

  1. public final class SoulWebsocketClient extends WebSocketClient {
  2. private volatile boolean alreadySync = Boolean.FALSE;
  3. @Override
  4. public void onOpen(final ServerHandshake serverHandshake) {
  5. if (!alreadySync) {
  6. send(DataEventTypeEnum.MYSELF.name());
  7. alreadySync = true;
  8. }
  9. }
  10. }

看到上面的代码了么,简单解读一下:

  1. 当 socket 成功建立连接就会执行这个 onOpen 方法
  2. 然后给 soul-admin 去发送一条 MYSELF 消息
  3. 然后更新 alreadySync 状态

接下来就需要回头去看 soul-admin 接收到 MYSELF 消息之后会做出什么样的响应了。

  1. @ServerEndpoint("/websocket")
  2. public class WebsocketCollector {
  3. @OnMessage
  4. public void onMessage(final String message, final Session session) {
  5. if (message.equals(DataEventTypeEnum.MYSELF.name())) {
  6. WebsocketCollector.session = session;
  7. SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
  8. }
  9. }
  10. }

嗯,再来简答解读下:

  1. 判断是否是 MYSELF 消息
  2. _调用 _SyncDataService 类的 syncAll 方法,这个方法的名字一看就是同步所有数据

那么同步哪些数据呢?结合之前的文章,肯定会有「选择器」和「规则」的数据吧,毕竟这两块就是接口相关的数据。

让我们继续看下去:

  1. @Service("syncDataService")
  2. public class SyncDataServiceImpl implements SyncDataService {
  3. @Override
  4. public boolean syncAll(final DataEventTypeEnum type) {
  5. appAuthService.syncData();
  6. List<PluginData> pluginDataList = pluginService.listAll();
  7. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
  8. List<SelectorData> selectorDataList = selectorService.listAll();
  9. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
  10. List<RuleData> ruleDataList = ruleService.listAll();
  11. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
  12. metaDataService.syncData();
  13. return true;
  14. }
  15. }

嗯,果然不出我们所料,「选择器」和「规则」的数据都在这里,但是还多了一个「插件」的数据,不清楚「插件」的概念也不用慌,继续看下去。

这里是获取到了 3 种数据,然后通通用事件发布模式给发布了出去,老规矩去找到监听的方法。

  1. public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  2. @Override
  3. public void onApplicationEvent(final DataChangedEvent event) {
  4. for (DataChangedListener listener : listeners) {
  5. switch (event.getGroupKey()) {
  6. // 省略其他的 case...
  7. case PLUGIN:
  8. listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
  9. break;
  10. case RULE:
  11. listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
  12. break;
  13. case SELECTOR:
  14. listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
  15. break;
  16. default:
  17. throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
  18. }
  19. }
  20. }
  21. }

这几个事件的监听处理都是同一个地方,根据数据的类型不同再去执行不同的方法,我们继续看下去。

  1. public class WebsocketDataChangedListener implements DataChangedListener {
  2. @Override
  3. public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
  4. WebsocketData<PluginData> websocketData =
  5. new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
  6. WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
  7. }
  8. @Override
  9. public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
  10. WebsocketData<SelectorData> websocketData =
  11. new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
  12. WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
  13. }
  14. @Override
  15. public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
  16. WebsocketData<RuleData> configData =
  17. new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
  18. WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
  19. }
  20. }

看到了这里应该就明白了,分别把需要同步的「插件」、「选择器」和「规则」数据用 socket 再发送出去。那么这时候的接收方是谁呢?没错,又回到了 soul-bootstrap,不过这一次带上了 3 种数据。

我们依葫芦画瓢,继续找到 soul-bootstrap 处理的地方,也就是上面 SoulWebsocketClient 类里面的 onMessage 方法(不仅回到了 soul-bootstrap,还回到了同一个类里面):

  1. public final class SoulWebsocketClient extends WebSocketClient {
  2. @Override
  3. public void onMessage(final String result) {
  4. handleResult(result);
  5. }
  6. private void handleResult(final String result) {
  7. WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
  8. ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
  9. String eventType = websocketData.getEventType();
  10. String json = GsonUtils.getInstance().toJson(websocketData.getData());
  11. websocketDataHandler.executor(groupEnum, json, eventType);
  12. }
  13. }

简单解读下这段代码:

  1. 从参数里解析出数据、事件类型等需要的数据
  2. 交给 websocketDataHandler 去处理

又到了我们一步一猜的时候了,猜测会怎么去处理数据?因为数据的种类不一样,肯定需要分开单独去处理吧。

这里一气化三清,数据分别交给了 PluginDataHandler、SelectorDataHandler、RuleDataHandler 刷新数据(其实就是删除数据)之后,又会走到同一个方法里面去处理。

  1. public class CommonPluginDataSubscriber implements PluginDataSubscriber {
  2. private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
  3. Optional.ofNullable(classData).ifPresent(data -> {
  4. if (data instanceof PluginData) {
  5. PluginData pluginData = (PluginData) data;
  6. if (dataType == DataEventTypeEnum.UPDATE) {
  7. BaseDataCache.getInstance().cachePluginData(pluginData);
  8. Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
  9. } else if (dataType == DataEventTypeEnum.DELETE) {
  10. BaseDataCache.getInstance().removePluginData(pluginData);
  11. Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
  12. }
  13. } else if (data instanceof SelectorData) {
  14. SelectorData selectorData = (SelectorData) data;
  15. if (dataType == DataEventTypeEnum.UPDATE) {
  16. BaseDataCache.getInstance().cacheSelectData(selectorData);
  17. Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
  18. } else if (dataType == DataEventTypeEnum.DELETE) {
  19. BaseDataCache.getInstance().removeSelectData(selectorData);
  20. Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
  21. }
  22. } else if (data instanceof RuleData) {
  23. RuleData ruleData = (RuleData) data;
  24. if (dataType == DataEventTypeEnum.UPDATE) {
  25. BaseDataCache.getInstance().cacheRuleData(ruleData);
  26. Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
  27. } else if (dataType == DataEventTypeEnum.DELETE) {
  28. BaseDataCache.getInstance().removeRuleData(ruleData);
  29. Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
  30. }
  31. }
  32. });
  33. }
  34. }

这段代码就是判断数据的类型——区分「插件」、「选择器」和「规则」,另外判断数据操作的类型——更新还是删除。

最后把这些数据写入到对应的内存里面。

其实到了这里,我们也就可以回答开头的两个问题了。

总结

第一,整个代码分析的过程就是 soul-bootstrap 处理接口数据的过程;第二,因为数据存储在内存里面,soul-bootstrap 一旦挂掉数据就会消失,然而每次启动就会从 soul-admin 那里同步数据。

另外,soul-admin 还会定时同步和更新接口数据,推送给 soul-bootstrap,这个过程跟上面 soul-bootstrap 接收到数据的过程是一样的。

到此为止,我们已经从自己的服务开始,把整个 http 接口数据注册到 soul-admin,然后通过 socket 同步到了 soul-bootstrap,也就是说 soul-bootstrap 已经存在了代理我们真实的 http 服务的数据。

但是,新的问题来了—— soul-bootstrap 是怎么完成代理的?

我们下篇文章再分析。