序
在上一篇文章中,我们已经知道了 soul-admin 会启动 web socket 服务,当 soul-bootstrap 网关启动就会跟 soul-admin 的 web socket 服务建立连接。
再加上我们自己的接口服务会自动注册到 soul-admin,然后被推送到 soul-bootsrap 网关。经过这样的过程之后,我们就可以使用 soul-bootstrap 来代理我们原本的 http 接口。
今天这篇文章我们就来从代码层面去看看整个 http 接口的数据的流转过程。
1 注册到 soul-admin
接口数据的源头始终是我们自己新建的 http 服务,通过引入的 starter 依赖借用 @SoulSpringMvcClient 注解扫描到需要代理的接口,然后在 Spring Bean 初始化完成之后去调用 http 工具类以 post 方式提交给 soul-admin。
这一块逻辑在前面的文章已经梳理过了,这里不再重复。
那么要如何确定到代码呢,就需要回看之前分析 SpringMvcClientBeanPostProcessor 类的文章了。这里面明确的指定了 http post 提交的接口地址。
soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
下面就是我们找到的 soul-admin 涉及到这部分的代码,同样只关注 http 的注册方式。
@RestController
@RequestMapping("/soul-client")
public class SoulClientController {
private final SoulClientRegisterService soulClientRegisterService;
/**
* Instantiates a new Soul client controller.
*
* @param soulClientRegisterService the soul client register service
*/
public SoulClientController(final SoulClientRegisterService soulClientRegisterService) {
this.soulClientRegisterService = soulClientRegisterService;
}
/**
* Register spring mvc string.
*
* @param springMvcRegisterDTO the spring mvc register dto
* @return the string
*/
@PostMapping("/springmvc-register")
public String registerSpringMvc(@RequestBody final SpringMvcRegisterDTO springMvcRegisterDTO) {
return soulClientRegisterService.registerSpringMvc(springMvcRegisterDTO);
}
}
可以看到这个 Controller 类就是一个简单的调用 service 方法提供 api 接口。
可以猜猜它会干些啥?
- 我猜测它会在 service 去处理传递过来的接口参数,然后把数据存起来。因为 soul-admin 的前端页面是可以访问到这些数据的。
- 数据到了 soul-admin,整个过程还只完成了一半,这些接口数据最终肯定会到 soul-bootstrap 的,结合前面梳理的逻辑,猜测应该会借助 web socket 推送给 soul-bootstrap。
接下来我们继续看代码,验证我们的猜想。
直接到 SoulClientRegisterServiceImpl 实现类,我也移除了多余的代码块,加上了注释。
public class SoulClientRegisterServiceImpl implements SoulClientRegisterService {
/**
* 这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想
*/
@Transactional
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
// 这个是注册元数据,暂时忽略
if (dto.isRegisterMetaData()) {
MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
if (Objects.isNull(exist)) {
saveSpringMvcMetaData(dto);
}
}
// 注册选择器并且拿到选择器 ID
String selectorId = handlerSpringMvcSelector(dto);
// 注册规则,同时规则需要跟选择器关联
handlerSpringMvcRule(selectorId, dto);
return SoulResultMessage.SUCCESS;
}
}
从上面的代码我们可以看到这里加上了数据库事务的注解,也就是说数据会存储到数据库,可以验证我们的第一个猜想——数据会被存储到数据库。
同时,数据库事务一般都会执行至少两条 SQL 的操作,可以看到上面的代码分别处理了「选择器」和「规则」两个方法。并且因为「规则」的处理还需要传递「选择器」ID,可以猜想「选择器」和「规则」是一对多的关系,实际上也的确是的,这可以从实体类或者数据库字段看出来。
接下来继续深入代码,可以发现不管是处理「选择器」还是「规则」都有一个 DataChangedEvent 事件发布,那就很可能是给 soul-bootstrap 推送数据了吧。
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(selectorData)));
有事件发布,肯定会有事件监听,接下来找到 DataChangedEvent 事件的监听代码。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
可以看到这个监听器会处理很多种不同的逻辑,我们只关注 web socket 下面的「选择器」和「规则」的处理。
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
WebsocketData<RuleData> configData =
new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
}
可以看到这里拿到了变动的「选择器」列表数据和「规则」列表数据,然后调用了 WebsocketController 类里面的 send 方法,那么发送到哪里去呢?很可能就是 soul-bootstrap 了吧。
public class WebsocketCollector {
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}
虽然直接从代码里看不出来发送到了 soul-bootstrap,但是可以明确一点——这些数据将会发送给连接上了 soul-admin 模块的 web socket 服务,也就是说只要证明 soul-bootstrap 连上了 soul-admin 的 web socket 服务就可以证明数据将会从 soul-admin 到 soul-bootstrap 了。
然而上篇文章我们已经知道 soul-bootstrap 的配置文件的确配置了 soul-admin 的 web socket 服务了,但是它是在哪里连接上的呢?
2 soul-bootstrap 怎么连接 web socket 服务
答案就是数据同步模块,还是老规矩只关注 web socket 方式的数据同步,在 soul-bootstrap 的 pom.xml 文件里依赖了 soul-spring-boot-starter-sync-data-websocket 模块,这个就是 web socket 同步数据用的。
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
继续寻找配置类,因为配置类会读取 soul-bootstrap 配置的 web socket 服务地址,拿到地址之后肯定会去建立连接对吧。
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
}
// 创建 web socket 连接
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
/**
* Instantiates a new Websocket sync cache.
*
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
}
尽管看起来代码量挺多,但是做的事情却很少,无非就是从配置类里面拿到 web socket 地址,然后为每一个地址创建一个 socket 连接,之后用线程池去处理每个连接之后的事情。
不过这里对似乎看不出如何对传入的数据进行处理的过程,而是使用了观察者模式去完成数据的订阅处理,如果你跟我一样好奇这里的流程,我们下篇文章再一起深入吧。
总结
总之,到了这里已经知道了 soul-bootstrap 会主动跟 soul-admin 建立 web socket 连接,soul-admin 也会给 soul-bootstrap 推送接口变更数据。
也通过源码验证了文章开头提出的两个问题。
但是又留下了一个新问题:soul-bootstrap 是怎么处理这些接口数据的。
我们不妨再发散下思维,假如 soul-bootstrap 挂掉了重启,接口数据会不会消失?如果消失了要怎么办?
这些问题我们就下次去解决吧。