序
在上一篇文章中,我们已经知道了 soul-admin 会启动 web socket 服务,当 soul-bootstrap 网关启动就会跟 soul-admin 的 web socket 服务建立连接。
再加上我们自己的接口服务会自动注册到 soul-admin,然后被推送到 soul-bootsrap 网关。经过这样的过程之后,我们就可以使用 soul-bootstrap 来代理我们原本的 http 接口。
今天这篇文章我们就来从代码层面去看看整个 http 接口的数据的流转过程。
1 注册到 soul-admin
接口数据的源头始终是我们自己新建的 http 服务,通过引入的 starter 依赖借用 @SoulSpringMvcClient 注解扫描到需要代理的接口,然后在 Spring Bean 初始化完成之后去调用 http 工具类以 post 方式提交给 soul-admin。
这一块逻辑在前面的文章已经梳理过了,这里不再重复。
那么要如何确定到代码呢,就需要回看之前分析 SpringMvcClientBeanPostProcessor 类的文章了。这里面明确的指定了 http post 提交的接口地址。
soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
下面就是我们找到的 soul-admin 涉及到这部分的代码,同样只关注 http 的注册方式。
@RestController@RequestMapping("/soul-client")public class SoulClientController {private final SoulClientRegisterService soulClientRegisterService;/*** Instantiates a new Soul client controller.** @param soulClientRegisterService the soul client register service*/public SoulClientController(final SoulClientRegisterService soulClientRegisterService) {this.soulClientRegisterService = soulClientRegisterService;}/*** Register spring mvc string.** @param springMvcRegisterDTO the spring mvc register dto* @return the string*/@PostMapping("/springmvc-register")public String registerSpringMvc(@RequestBody final SpringMvcRegisterDTO springMvcRegisterDTO) {return soulClientRegisterService.registerSpringMvc(springMvcRegisterDTO);}}
可以看到这个 Controller 类就是一个简单的调用 service 方法提供 api 接口。
可以猜猜它会干些啥?
- 我猜测它会在 service 去处理传递过来的接口参数,然后把数据存起来。因为 soul-admin 的前端页面是可以访问到这些数据的。
- 数据到了 soul-admin,整个过程还只完成了一半,这些接口数据最终肯定会到 soul-bootstrap 的,结合前面梳理的逻辑,猜测应该会借助 web socket 推送给 soul-bootstrap。
接下来我们继续看代码,验证我们的猜想。
直接到 SoulClientRegisterServiceImpl 实现类,我也移除了多余的代码块,加上了注释。
public class SoulClientRegisterServiceImpl implements SoulClientRegisterService {/*** 这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想*/@Transactionalpublic String registerSpringMvc(final SpringMvcRegisterDTO dto) {// 这个是注册元数据,暂时忽略if (dto.isRegisterMetaData()) {MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());if (Objects.isNull(exist)) {saveSpringMvcMetaData(dto);}}// 注册选择器并且拿到选择器 IDString selectorId = handlerSpringMvcSelector(dto);// 注册规则,同时规则需要跟选择器关联handlerSpringMvcRule(selectorId, dto);return SoulResultMessage.SUCCESS;}}
从上面的代码我们可以看到这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想——数据会被存储到数据库。
同时,数据库事务一般都会执行至少两条 SQL 的操作,可以看到上面的代码分别处理了「选择器」和「规则」两个方法。并且因为「规则」的处理还需要传递「选择器」ID,可以猜想「选择器」和「规则」是一对多的关系,实际上也的确是的,这可以从实体类或者数据库字段看出来。
接下来继续深入代码,可以发现不管是处理「选择器」还是「规则」都有一个 DataChangedEvent 事件发布,那就很可能是给 soul-bootstrap 推送数据了吧。
// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(selectorData)));
有事件发布,肯定会有事件监听,接下来找到 DataChangedEvent 事件的监听代码。
@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {switch (event.getGroupKey()) {case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}}
可以看到这个监听器会处理很多种不同的逻辑,我们只关注 web socket 下面的「选择器」和「规则」的处理。
public class WebsocketDataChangedListener implements DataChangedListener {@Overridepublic void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {WebsocketData<SelectorData> websocketData =new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}@Overridepublic void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {WebsocketData<RuleData> configData =new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);}}
可以看到这里拿到了变动的「选择器」列表数据和「规则」列表数据,然后调用了 WebsocketController 类里面的 send 方法,那么发送到哪里去呢?很可能就是 soul-bootstrap 了吧。
public class WebsocketCollector {/*** Send.** @param message the message* @param type the type*/public static void send(final String message, final DataEventTypeEnum type) {if (StringUtils.isNotBlank(message)) {if (DataEventTypeEnum.MYSELF == type) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("websocket send result is exception: ", e);}return;}for (Session session : SESSION_SET) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("websocket send result is exception: ", e);}}}}}
虽然直接从代码里看不出来发送到了 soul-bootstrap,但是可以明确一点——这些数据将会发送给连接上了 soul-admin 模块的 web socket 服务,也就是说只要证明 soul-bootstrap 连上了 soul-admin 的 web socket 服务就可以证明数据将会从 soul-admin 到 soul-bootstrap 了。
然而上篇文章我们已经知道 soul-bootstrap 的配置文件的确配置了 soul-admin 的 web socket 服务了,但是它是在哪里连接上的呢?
2 soul-bootstrap 怎么连接 web socket 服务
答案就是数据同步模块,还是老规矩只关注 web socket 方式的数据同步,在 soul-bootstrap 的 pom.xml 文件里依赖了 soul-spring-boot-starter-sync-data-websocket 模块,这个就是 web socket 同步数据用的。
<dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId><version>${project.version}</version></dependency>
继续寻找配置类,因为配置类会读取 soul-bootstrap 配置的 web socket 服务地址,拿到地址之后肯定会去建立连接对吧。
@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {@Beanpublic SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use websocket sync soul data.......");return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}}// 创建 web socket 连接public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {/*** Instantiates a new Websocket sync cache.** @param websocketConfig the websocket config* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers the meta data subscribers* @param authDataSubscribers the auth data subscribers*/public WebsocketSyncDataService(final WebsocketConfig websocketConfig,final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));for (String url : urls) {try {clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));} catch (URISyntaxException e) {log.error("websocket url({}) is error", url, e);}}try {for (WebSocketClient client : clients) {boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);if (success) {log.info("websocket connection is successful.....");} else {log.error("websocket connection is error.....");}executor.scheduleAtFixedRate(() -> {try {if (client.isClosed()) {boolean reconnectSuccess = client.reconnectBlocking();if (reconnectSuccess) {log.info("websocket reconnect is successful.....");} else {log.error("websocket reconnection is error.....");}}} catch (InterruptedException e) {log.error("websocket connect is error :{}", e.getMessage());}}, 10, 30, TimeUnit.SECONDS);}/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/} catch (InterruptedException e) {log.info("websocket connection...exception....", e);}}}
尽管看起来代码量挺多,但是做的事情却很少,无非就是从配置类里面拿到 web socket 地址,然后为每一个地址创建一个 socket 连接,之后用线程池去处理每个连接之后的事情。
不过这里对似乎看不出如何对传入的数据进行处理的过程,而是使用了观察者模式去完成数据的订阅处理,如果你跟我一样好奇这里的流程,我们下篇文章再一起深入吧。
总结
总之,到了这里已经知道了 soul-bootstrap 会主动跟 soul-admin 建立 web socket 连接,soul-admin 也会给 soul-bootstrap 推送接口变更数据。
也通过源码验证了文章开头提出的两个问题。
但是又留下了一个新问题:soul-bootstrap 是怎么处理这些接口数据的。
我们不妨再发散下思维,假如 soul-bootstrap 挂掉了重启,接口数据会不会消失?如果消失了要怎么办?
这些问题我们就下次去解决吧。
