- 配置管理
- Nacos 中的Namespace和Group
- 多环境切换
- 多/共享配置
- 配置中心刷新
- 灰度发布
- 源码分析
- 源码入口
- springcloud 整合nacos文件加载流程
- Nacos客户端的数据的加载流程
- Nacos Server端的配置获取
- 客户端配置的动态感知
- 检查服务端配置
- 服务端配置更新的推送
- Nacos配置中心集群原理及源码分析
- 总结
- 结构化思维
Nacos 提供用于存储配置和其他元数据的 key/value 存储,为分布式系统中的外部化配置提供服务器端
和客户端支持。使用 Spring Cloud Alibaba Nacos Config,您可以在 Nacos Server 集中管理你 Spring
Cloud 应用的外部属性配置。
Spring Cloud Alibaba Nacos Config 是 Config Server 和 Client 的替代方案,客户端和服务器上的概念
与 Spring Environment 和 PropertySource 有着一致的抽象,在特殊的 bootstrap 阶段,配置被加载
到 Spring 环境中。当应用程序通过部署管道从开发到测试再到生产时,您可以管理这些环境之间的配
置,并确保应用程序具有迁移时需要运行的所有内容。
配置管理
可以在Nacos控制台配置项目的配置数据,先打开Nacos控制台,在 命名空间 中添加 新建命名空
间 ,如下图
在配置列表中展示在 配置管理>配置列表 中添加,如下图:
再将项目中的配置内容拷贝到如下表单中,比如我们可以把原来在代码里面的application.properties中的配置填
写到下面表单中,如下图:
注意 Data ID 和服务名字保持一致,作为程序默认加载配置。
工程中先引入依赖包
<properties>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.2.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--配置中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.yehui.springcloudnacosdemo.SpringCloudNacosDemoApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
配置文件中添加配置中心地址,在项目中 的 bootstrap.properties. 中添加如下配置:
# nacos设置配置中心服务端地址
spring.cloud.nacos.config.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.config.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
# Nacos 配置中心的namespace。需要注意,如果使用 public 的 namcespace ,请不要填写这个值,直接留空即可
# spring.cloud.nacos.config.namespace=
spring.cloud.nacos.discovery.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.discovery.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
#Nacos中配置文件的后缀为yaml
spring.cloud.nacos.config.file-extension=properties
@RestController
public class NacosControllerTest {
@Value("${name}")
private String name;
@RequestMapping("/index")
public String index(){
return name+"nacos配置中心测试";
}
}
启动 hailtaxi-driver 服务,默认加载 ${spring.application.name}.${fileextension:properties} 配置,加载完成后,配置数据会生效,并访问
http://localhost:8082/index 测试,效果如下:
如果此时配置文件名字如果和当前服务名字不一致,可以使用 name 属性来指定配置文件名字:
# nacos设置配置中心服务端地址
spring.cloud.nacos.config.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.config.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
# Nacos 配置中心的namespace。需要注意,如果使用 public 的 namcespace ,请不要填写这个值,直接留空即可
# spring.cloud.nacos.config.namespace=
spring.cloud.nacos.discovery.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.discovery.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
#Nacos中配置文件的后缀为yaml
spring.cloud.nacos.config.file-extension=properties
##指定配置文件名字
spring.cloud.nacos.config.name=dataIdTest.properties
Nacos 中的Namespace和Group
在nacos中提供了namespace和group命名空间和分组的机制。,它是Nacos提供的一种数据模型,也
就是我们要去定位到一个配置,需要基于namespace- > group ->dataid来实现。
namespace可以解决多环境以及多租户数据的隔离问题。比如在多套环境下,可以根据指定环境创建
不同的namespace,实现多环境隔离。或者在多租户的场景中,每个用户可以维护自己的
namespace,实现每个用户的配置数据和注册数据的隔离。
group是分组机制,它的纬度是实现服务注册信息或者DataId的分组管理机制,对于group的用法,没
有固定的规则,它也可以实现不同环境下的分组,也可以实现同一个应用下不同配置类型或者不同业务
类型的分组。
官方建议是,namespace用来区分不同环境,group可以专注在业务层面的数据分组。实际上在
使用过程中,最重要的是提前定要统一的口径和规定,避免不同的项目团队混用导致后期维护混
乱的问题。
自定义namespace
在没有明确指定 ${spring.cloud.nacos.config.namespace} 配置的情况下, 默认使用的是 Nacos
上 Public 这个namespae。如果需要使用自定义的命名空间,可以通过以下配置来实现:
spring.cloud.nacos.config.namespace=b3404bc0-d7dc-4855-b519-570ed34b62d7
该配置必须放在 bootstrap.properties 文件中。此外
spring.cloud.nacos.config.namespace 的值是 namespace 对应的 id,id 值可以在 Nacos
的控制台获取。并且在添加配置时注意不要选择其他的 namespae,否则将会导致读取不到正确
的配置。
自定义group
在没有明确指定 ${spring.cloud.nacos.config.group} 配置的情况下, 默认使用的是DEFAULT_GROUP 。如果需要自定义自己的 Group,可以通过以下配置来实现:
spring.cloud.nacos.config.group=DEVELOP_GROUP
该配置必须放在 bootstrap.properties 文件中。并且在添加配置时 Group 的值一定要和
spring.cloud.nacos.config.group 的配置值一致
自定义扩展的DataId
Spring Cloud Alibaba Nacos Config 从 0.2.1 版本后,可支持自定义 Data Id 的配置。关于这部分详细
的设计可参考 这里。 一个完整的配置案例如下所示:
spring.application.name=opensource-service-provider
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
# config external configuration
# 1、Data Id 在默认的组 DEFAULT_GROUP,不支持配置的动态刷新
spring.cloud.nacos.config.extension-configs[0].data-id=ext-configcommon01.properties
# 2、Data Id 不在默认的组,不支持动态刷新
spring.cloud.nacos.config.extension-configs[1].data-id=ext-configcommon02.properties
spring.cloud.nacos.config.extension-configs[1].group=GLOBALE_GROUP
# 3、Data Id 既不在默认的组,也支持动态刷新
spring.cloud.nacos.config.extension-configs[2].data-id=ext-configcommon03.properties
spring.cloud.nacos.config.extension-configs[2].group=REFRESH_GROUP
spring.cloud.nacos.config.extension-configs[2].refresh=true
可以看到:
- 通过 spring.cloud.nacos.config.extension-configs[n].data-id 的配置方式来支持多个Data Id 的配置。
- 通过 spring.cloud.nacos.config.extension-configs[n].group 的配置方式自定义 Data Id所在的组,不明确配置的话,默认是 DEFAULT_GROUP。
- 通过 spring.cloud.nacos.config.extension-configs[n].refresh 的配置方式来控制该Data Id 在配置变更时,是否支持应用中可动态刷新, 感知到最新的配置值。默认是不支持的。
多个 Data Id 同时配置时,他的优先级关系是 spring.cloud.nacos.config.extensionconfigs[n].data-id 其中 n 的值越大,优先级越高。
Note spring.cloud.nacos.config.extension-configs[n].data-id 的值必须带文件扩展名,文件扩展名既可支持 properties,又可以支持 yaml/yml。 此时spring.cloud.nacos.config.file-extension 的配置对自定义扩展配置的 Data Id 文件扩展名没有影响。
通过自定义扩展的 Data Id 配置,既可以解决多个应用间配置共享的问题,又可以支持一个应用有多个
配置文件。
为了更加清晰的在多个应用间配置共享的 Data Id ,你可以通过以下的方式来配置:
通过自定义扩展的 Data Id 配置,既可以解决多个应用间配置共享的问题,又可以支持一个应用有多个
配置文件。
为了更加清晰的在多个应用间配置共享的 Data Id ,你可以通过以下的方式来配置:
# 配置支持共享的 Data Id
spring.cloud.nacos.config.shared-configs[0].data-id=common.yaml
# 配置 Data Id 所在分组,缺省默认 DEFAULT_GROUP
spring.cloud.nacos.config.shared-configs[0].group=GROUP_APP1
# 配置Data Id 在配置变更时,是否动态刷新,缺省默认 false
spring.cloud.nacos.config.shared-configs[0].refresh=true
可以看到:
- 通过 spring.cloud.nacos.config.shared-configs[n].data-id 来支持多个共享 Data Id 的配置。
- 通过 spring.cloud.nacos.config.shared-configs[n].group 来配置自定义 Data Id 所在的
组,不明确配置的话,默认是 DEFAULT_GROUP。
- 通过 spring.cloud.nacos.config.shared-configs[n].refresh 来控制该Data Id在配置变更
时,是否支持应用中动态刷新,默认false。
配置的优先级
Spring Cloud Alibaba Nacos Config 目前提供了三种配置能力从 Nacos 拉取相关的配置。
- A: 通过 spring.cloud.nacos.config.shared-configs[n].data-id 支持多个共享 Data Id 的配置
- B: 通过 spring.cloud.nacos.config.extension-configs[n].data-id 的方式支持多个扩展Data Id 的配置
- C: 通过内部相关规则(应用名、应用名+ Profile )自动生成相关的 Data Id 配置
当三种方式共同使用时,他们的一个优先级关系是:A < B < C
多环境切换
spring-cloud-starter-alibaba-nacos-config 在加载配置的时候,不仅仅加载了以 dataid 为
${spring.application.name}.${file-extension:properties} 为前缀的基础配置,还加载了dataid为 ${spring.application.name}-${profile}.${file-extension:properties} 的基础配置。在日常开发中如果遇到多套环境下的不同配置,可以通过Spring 提供的${spring.profiles.active} 这个配置项来配置。
比如开发环境我们可以在nacos中创建 spring-cloud-nacos-demo-dev.properties ,测试环境可以在配置中创建
spring-cloud-nacos-demo-test.properties ,创建如下:
spring-cloud-nacos-demo-test.properties
spring-cloud-nacos-demo-dev.properties
修改bootstrap.properties配置文件,如下
# nacos设置配置中心服务端地址
spring.cloud.nacos.config.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.config.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
# Nacos 配置中心的namespace。需要注意,如果使用 public 的 namcespace ,请不要填写这个值,直接留空即可
# spring.cloud.nacos.config.namespace=
spring.cloud.nacos.discovery.server-addr=localhost:8848
#Nacos中命名空间的ID
spring.cloud.nacos.discovery.namespace=6f7b53f3-c083-4230-82a2-26d4e7c85a5d
#Nacos中配置文件的后缀为yaml
spring.cloud.nacos.config.file-extension=properties
#Nacos激活配置 对应spring-cloud-nacos-demo-dev.properties
spring.profiles.active=dev
server.port=8082
测试代码
@RestController
public class NacosControllerTest {
@Value("${ip}")
private String ip;
@RequestMapping("/index")
public String index(){
return ip+"nacos配置中心测试";
}
}
测试http://localhost:8082/index 效果如下:
将 active 换成test,效果如下:
多/共享配置
在实际的业务场景中应用和共享配置间的关系可能, Spring Cloud Alibaba Nacos Config 从 0.2.1
版本后,可支持自定义 Data Id 的配置,通过它可以解决配置共享问题。
我们可以先创建一个配置 datasource.properties用于模拟配置数据库连接,如下图:
在 bootstrap.properties中引入配置需要使用 extension-configs 属性,配置如下:
这里 extension-configs[n] 中n值越大,优先级越高,它既能解决一个应用多个配置,同时还能解决
配置共享问题。
测试代码
@RestController
public class NacosControllerTest {
@Value("${ip}")
private String ip;
@RequestMapping("/index")
public String index(){
return ip+"nacos配置中心测试";
}
}
测试http://localhost:8082/index 效果如下:
配置中心刷新
配置自动刷新对程序来说非常重要,Nacos支持配置自动刷新,并且提供了多种刷新机制。
Environment自动刷新
spring-cloud-starter-alibaba-nacos-config 支持配置的动态更新,Environment能实时更新到最
新的配置信息,启动 Spring Boot 应用测试的代码如下:
@SpringBootApplication
@EnableDiscoveryClient
public class SpringCloudNacosDemoApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext applicationContext = SpringApplication.run(SpringCloudNacosDemoApplication.class, args);
while (true){
//当动态刷新时,会更新到Environment中,因此这里每隔5s中从Enviroment中获取配置
String ip = applicationContext.getEnvironment().getProperty("ip");
System.out.println("ip:"+ip);
TimeUnit.SECONDS.sleep(5);
}
}
}
测试数据如下:
ip:192.168.211.112
ip:192.168.211.113
@Value刷新
程序中如果写了 @Value 注解,可以采用 @RefreshScope 实现刷新,只需要在指定类上添加该注解即
可,如下代码:
@RestController
@RefreshScope
public class NacosControllerTest {
@Value("${ip}")
private String ip;
@RequestMapping("/index")
public String index(){
return ip+"nacos配置中心测试";
}
}
灰度发布
灰度配置指的是指定部分客户端IP进行新配置的下发,其余客户端配置保持不变,用以验证新配置对客
户端的影响,保证配置的平稳发布。灰度配置是生产环境中一个比较重要的功能,对于保证生产环境的
稳定性非常重要。在1.1.0中,Nacos支持了以IP为粒度的灰度配置,具体使用步骤如下:
在配置列表页面,点击某个配置的“编辑配置”按钮,勾选“Beta发布”,在文本框里填入要下发配置配置的
IP,多个IP用逗号分隔,操作如下:
修改配置内容,点击“发布Beta”按钮,即可完成灰度配置的发布,点击“发布Beta”后,“发布Beta”按钮
变灰,此时可以选择“停止Beta”或者“发布”。“停止Beta”表示取消停止灰度发布,当前灰度发布配置的IP
列表和配置内容都会删除,页面回到正常发布的样式。“发布”表示将灰度配置在所有客户端生效,之前
的配置也会被覆盖,同时页面回到正常发布的样式:
源码分析
源码入口
在jar下面spring-cloud-starter-alibaba-nacos-config-2.2.6.RELEASE.jar下的spring.factories文件下面找到自动配置类
NacosConfigBootstrapConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {
@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}
@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigManager nacosConfigManager) {
return new NacosPropertySourceLocator(nacosConfigManager);
}
}
springcloud 整合nacos文件加载流程
NacosPropertySourceLocator
public class NacosPropertySourceLocator implements PropertySourceLocator {}
有代码可以看到NacosPropertySourceLocator实现了PropertySourceLocator接口,PropertySourceLocator接口的代码如下:
在看一下类的关系图
此时要关注一下NacosPropertySourceLocator在哪里进行初始化的,在NacosPropertySourceLocator.locate方法入口进行的debug,启动spring boot 观察一下调用链路
SpringApplication.run
由调用链路可以知道,在spring boot项目启动时,有一个prepareContext的方法,它会回调所有实现了
ApplicationContextInitializer的实例,来做一些初始化工作。
ApplicationContextInitializer是Spring?框架原有的东西,它的主要作用就是在,ConfigurableApplicationContext类型(或者子类型)的ApplicationContext做refresh之前,允许我们对ConfiurableApplicationContext的实例做进一步的设置和处理。 它可以用在需要对应用程序上下文进行编程初始化的wb应用程序中,比如根据上下文环境来注册propertySource,或者配置文件。而Config的这个配置中心的需求合好需要这样一个机制来完成。
public ConfigurableApplicationContext run(String... args) {
//省略代码...
this.prepareContext(context, environment, listeners, applicationArguments, printedBanner);
//省略代码
}
PropertySourceBootstrapConfiguration.initialize
其中,PropertySourceBootstrapConfiguration就实现了ApplicationContextInitializer,initialize方法代a码如下。
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList();
//对propertySourceLocators数组进行排序,根据默认的AnnotationAwareOrderComparator
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
//获取运行的环境上下文
ConfigurableEnvironment environment = applicationContext.getEnvironment();
Iterator var5 = this.propertySourceLocators.iterator();
while(true) {
Collection source;
do {
do {
if (!var5.hasNext()) {
if (!empty) {
//获取当前Environment中的所有PropertySources.
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
// 遍历移除bootstrapProperty的相关属性
Iterator var15 = environment.getPropertySources().iterator();
while(var15.hasNext()) {
PropertySource<?> p = (PropertySource)var15.next();
if (p.getName().startsWith("bootstrapProperties")) {
propertySources.remove(p.getName());
}
}
//把前面获取到的PropertySource,插入到Environment中的PropertySources中。
this.insertPropertySources(propertySources, composite);
this.reinitializeLoggingSystem(environment, logConfig, logFile);
this.setLogLevels(applicationContext, environment);
this.handleIncludedProfiles(environment);
}
return;
}
//回调所有实现PropertySourceLocator接口实例的locate方法,并收集到source这个集合中。
PropertySourceLocator locator = (PropertySourceLocator)var5.next();
source = locator.locateCollection(environment);
} while(source == null);
} while(source.size() == 0);
//遍历source,把PropertySource包装成BootstrapPropertySource加入到sourceList中。
List<PropertySource<?>> sourceList = new ArrayList();
Iterator var9 = source.iterator();
while(var9.hasNext()) {
PropertySource<?> p = (PropertySource)var9.next();
if (p instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerable = (EnumerablePropertySource)p;
sourceList.add(new BootstrapPropertySource(enumerable));
} else {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
}
logger.info("Located property source: " + sourceList);
//将source添加到数组
composite.addAll(sourceList);
//表示propertysource不为空
empty = false;
}
}
上述代码逻辑说明如下。
- 首先this.propertySourceLocators,表示所有实现了PropertySourceLocators接口的实现类,其中就包括我们前面自定义的GpJsonPropertySourceLocator。
- 根据默认的 AnnotationAwareOrderComparator 排序规则对 propertySourceLocators数组进行排序。
- 获取运行的环境上下文ConfigurableEnvironment
- 遍历propertySourceLocators时
- 调用 locate 方法,传入获取的上下文environment
- 将source添加到PropertySource的链表中
- 设置source是否为空的标识标量empty
- source不为空的情况,才会设置到environment中
- 返回Environment的可变形式,可进行的操作如addFirst、addLast
- 移除propertySources中的bootstrapProperties
- 根据config server覆写的规则,设置propertySources
- 处理多个active profiles的配置信息
注意:this.propertySourceLocatorsi这个集合中的PropertySourceLocator,是通过自动装配机制完成注入的,具体的实现在BootstrapImportSelector这个类中。
NacosPropertySourceLocator
理解了上述基本原理后,再来思考第二个问题。如何从远程服务器上加载配置到Spring的Environment中。 顺着前面的分析思路,很自然的去找PropertySourceLocator的实现类,发现除了 自定义的GpJsonPropertySourceLocator以外,还有另外一个实现类 NacosPropertySourceLocator . 于是,直接来看NacosPropertySourceLocator中的locate方法,代码如下。
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
//nacosConfigManager是NacosConfigBootstrapConfiguration配置类里面实例化
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
//获取客户端配置的超时时间
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
//获取name属性,
String name = nacosConfigProperties.getName();
//在Spring Cloud中,默认的name=spring.application.name。
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
//获取spring.application.name赋值给dataIdPrefix
dataIdPrefix = env.getProperty("spring.application.name");
}
//创建composite属性源,可以包含多个PropertySource
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
//加载共享配置
loadSharedConfiguration(composite);
//加载扩展配置
loadExtConfiguration(composite);
//加载自身的配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
上述代码的实现不难理解
- 获取nacos客户端的配置属性,并生成dataId(这个很重要,要定位nacos的配置)
- 分别调用三个方法从加载配置属性源,保存到composite组合属性源中
loadApplicationConfiguration
可以先不管加载共享配置、扩展配置的方法,最终本质上都是去远程服务上读取配置,只是传入的参数不一样。
- fileExtension,表示配置文件的扩展名
- nacosGroup表示分组。
- 加载dataid=项目名称的配置。
加载dataid=项目名称+扩展名的配置遍历当前配置的激活点(profile),分别循环加载带有profile的dataid配置
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String dataIdPrefix,
NacosConfigProperties properties, Environment environment) {
//默认的扩展名为: properties
String fileExtension = properties.getFileExtension();
//获取group
String nacosGroup = properties.getGroup();
//加载`dataid=项目名称`的配置
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
fileExtension, true);
//加载`dataid=项目名称+扩展名`的配置
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// 遍历profile(可以有多个),根据profile加载配置
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
}
loadNacosDataIfPresent
调用loadNacosPropertySource加载存在的配置信息。把加载之后的配置属性保存CompositePropertySourcer中。
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
//如果dataId为空,或者group为空,则直接跳过
if (null == dataId || dataId.trim().length() < 1) {
return;
}
if (null == group || group.trim().length() < 1) {
return;
}
//从nacos中获取属性源
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
fileExtension, isRefreshable);
//把属性源保存到compositePropertySource中
this.addFirstPropertySource(composite, propertySource, false);
}
loadNacosPropertySource
�
private NacosPropertySource loadNacosPropertySource(final String dataId,
final String group, String fileExtension, boolean isRefreshable) {
if (NacosContextRefresher.getRefreshCount() != 0) {
//是否支持自动刷新,// 如果不支持自动刷新配置则自动从缓存获取返回(不从远程服务器加载)
if (!isRefreshable) {
return NacosPropertySourceRepository.getNacosPropertySource(dataId,
group);
}
}
//构造器从配置中心获取数据
return nacosPropertySourceBuilder.build(dataId, group, fileExtension,
isRefreshable);
}
上述代码,是Spring Cloud 集成Nacos实现远程配置加载并保存到NacosPropertySource 中的。 其中,会根据NacosContextRefresher.getRefreshCount来判断是否要从本地读取配置。
- NacosContextRefresher.getRefreshCount() 这个表示Nacos上下文中设置的动态刷 新的监听数量,如果大于0,说明有配置发生了变更,则需要进一步判断是否需要获取最 新配置项
isRefreshable , 这个是对应properties文件中的 spring.cloud.nacos.config.refresh-enabled=false这个属性。如果设置为 false,表示不会从远程服务去获取最新的值。
动态刷新到本地
NacosContextRefresher
在Spring Cloud Nacos中,设置了一个本地缓存 NacosPropertySourceRepository,它会缓存所有配置项对应的NacosPropertySource实 例。那么这个缓存是在哪里更新的呢? 在com.alibaba.cloud.nacos.refresh这个包中,有一个NacosContextRefresher 上下 文刷新对象,它在启动时,会监听ApplicationReadyEvent事件
public class NacosContextRefresher
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
NacosContextRefresher.onApplicationEvent
�
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {//cas占位
this.registerNacosListenersForApplications(); //针对应用列表建立Nacos监听
}
}
registerNacosListenersForApplications
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {//开启自动配置
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
�该方法是注册应用监听的具体实现。
如果当前的自动刷新机制是开启状态,则遍历本地缓存中的所有NacosPropertySource
- 针对每个PropertySource,判断是否开启了自动刷新,如果没有则退出
- 否则,针对该dataId注册监听
registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {//针对该key绑定监听事件
refreshCountIncrement(); //递增刷新数量的原子变量(表示当前有数据变更)
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); //添加
到刷新的历史记录中,提供了EndPoint访问
//发布一个刷新事件,用来同步@Value注解的值
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
//注册事件监听
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
RefreshEventListener
当收到Nacos数据变更事件后,在spring-cloud-context这个包下的 RefreshEventListener就会收到RefreshEvent事件。 这个事件不用猜测也能知道,一定是要修改Environment中的PropertySource属性值,以及 刷新@Value注解的值
public class RefreshEventListener implements SmartApplicationListener {
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
this.handle((ApplicationReadyEvent)event);
} else if (event instanceof RefreshEvent) {
this.handle((RefreshEvent)event);
}
}
}
开始处理RefreshEvent事件
public void handle(RefreshEvent event) {
if (this.ready.get()) {
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
ContextRefresher.refresh
ContextRefresher刷新方通过构建上下文context获取最新的环境配置和处理@RefreshScope
- 这个方法需要做两个事情
- 刷新Environment中的上下文属性配置。
- 刷新bean中属性对应的@Value值
public synchronized Set<String> refresh() {
// 1.构建一个上下文Context刷新Environment配置数据
Set<String> keys = this.refreshEnvironment();
// 2.调用RefreshScope 处理注解@RefreshScope
this.scope.refreshAll();
return keys;
}
Context刷新Environment配置数据
SpringBoot项目启动的时候会创建一个上下文Context,同时NacosConfig的配置也会从远程服务读取加载到本地。
所以这里模拟了SpringBoot的启动,通过SpringApplicationBuilder构建一个上下文帮助获取最新的配置
执行链:refreshEnvironment()->this.addConfigFilesToEnvironment()
addConfigFilesToEnvironment
ConfigurableApplicationContext addConfigFilesToEnvironment() {
// 创建一个新的上下文
ConfigurableApplicationContext capture = null;
boolean var15 = false;
try {
var15 = true;
// 拷贝现有的Environment用来构建新的上下文
StandardEnvironment environment = this.copyEnvironment(this.context.getEnvironment());
// 上下文构建器
SpringApplicationBuilder builder = (new SpringApplicationBuilder(new Class[]{ContextRefresher.Empty.class})).bannerMode(Mode.OFF).web(WebApplicationType.NONE).environment(environment);
// 添加启动监听器
builder.application().setListeners(Arrays.asList(new BootstrapApplicationListener(), new ConfigFileApplicationListener()));
// 启动上下文
capture = builder.run(new String[0]);
if (environment.getPropertySources().contains("refreshArgs")) {
environment.getPropertySources().remove("refreshArgs");
}
// 获取最新的配置信息
MutablePropertySources target = this.context.getEnvironment().getPropertySources();
} finally {
// 最终关闭临时的上下文
if (var15) {
for(ConfigurableApplicationContext closeable = capture; closeable != null; closeable = (ConfigurableApplicationContext)closeable.getParent()) {
try {
closeable.close();
} catch (Exception var16) {
}
if (!(closeable.getParent() instanceof ConfigurableApplicationContext)) {
break;
}
}
}
}
for(ConfigurableApplicationContext closeable = capture; closeable != null; closeable = (ConfigurableApplicationContext)closeable.getParent()) {
try {
closeable.close();
} catch (Exception var17) {
}
if (!(closeable.getParent() instanceof ConfigurableApplicationContext)) {
break;
}
}
return capture;
}
通过新的上下文capture获取Environment再合并到现有的Environment,并关闭capture。
ConfigurationPropertiesRebinder
EnvironmentChangeEvent事件监听的实现,在ConfigurationPropertiesRebinder中,代码如下
public class ConfigurationPropertiesRebinder implements ApplicationContextAware, ApplicationListener<EnvironmentChangeEvent> {
public void onApplicationEvent(EnvironmentChangeEvent event) {
if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {
this.rebind();
}
}
}
rebind
遍历所有的属性类,重新绑定所有的配置属性对象的属性值
public void rebind() {
this.errors.clear();
Iterator var1 = this.beans.getBeanNames().iterator();
while(var1.hasNext()) {
String name = (String)var1.next();
this.rebind(name);
}
}
rebind
可以看到它的处理逻辑,就是把其内部存储的 ConfigurationPropertiesBeans 依次执行销毁逻辑,再执行初始化逻辑实现属性的重新绑定。
public boolean rebind(String name) {
if (!this.beans.getBeanNames().contains(name)) {
return false;
} else {
if (this.applicationContext != null) {
try {
Object bean = this.applicationContext.getBean(name);
if (AopUtils.isAopProxy(bean)) {
bean = ProxyUtils.getTargetObject(bean);
}
if (bean != null) {
if (this.getNeverRefreshable().contains(bean.getClass().getName())) {
return false;
}
this.applicationContext.getAutowireCapableBeanFactory().destroyBean(bean);//执行销毁
this.applicationContext.getAutowireCapableBeanFactory().initializeBean(bean, name);
//初始化bean,重新绑定新的属性
return true;
}
} catch (RuntimeException var3) {
this.errors.put(name, var3);
throw var3;
} catch (Exception var4) {
this.errors.put(name, var4);
throw new IllegalStateException("Cannot rebind to " + name, var4);
}
}
return false;
}
}
RefreshAll
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
销毁所有@RefreshScope注解修饰的bean对象。
public void destroy() {
List<Throwable> errors = new ArrayList();
Collection<GenericScope.BeanLifecycleWrapper> wrappers = this.cache.clear();
Iterator var3 = wrappers.iterator();
while(var3.hasNext()) {
GenericScope.BeanLifecycleWrapper wrapper = (GenericScope.BeanLifecycleWrapper)var3.next();
try {
Lock lock = ((ReadWriteLock)this.locks.get(wrapper.getName())).writeLock();
lock.lock();
try {
wrapper.destroy();
} finally {
lock.unlock();
}
} catch (RuntimeException var10) {
errors.add(var10);
}
}
if (!errors.isEmpty()) {
throw wrapIfNecessary((Throwable)errors.get(0));
} else {
this.errors.clear();
}
}
NacosPropertySourceBuilder.build
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
//调用loadNacosData加载远程数据
List<PropertySource<?>> propertySources = loadNacosData(dataId, group,
fileExtension);
//构造NacosPropertySource(这个是Nacos自定义扩展的PropertySource,和前面演示的自定义
PropertySource类似)。
// 相当于把从远程服务器获取的数据保存到NacosPropertySource中。
NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources,
group, dataId, new Date(), isRefreshable);
//把属性缓存到本地缓存
NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
return nacosPropertySource;
}
NacosPropertySourceBuilder.loadNacosData
这个方法,就是连接远程服务器去获取配置数据的实现,关键代码是 configService.getConfig
private List<PropertySource<?>> loadNacosData(String dataId, String group,
String fileExtension) {
String data = null;
try {
data = configService.getConfig(dataId, group, timeout);
if (StringUtils.isEmpty(data)) {
log.warn(
"Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]",
dataId, group);
return Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug(String.format(
"Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId,
group, data));
}
//对加载的数据进行解析,保存到List<PropertySource>集合。
return NacosDataParserHandler.getInstance().parseNacosData(dataId, data,
fileExtension);
}
catch (NacosException e) {
log.error("get data from Nacos error,dataId:{} ", dataId, e);
}
catch (Exception e) {
log.error("parse data from Nacos error,dataId:{},data:{}", dataId, data, e);
}
return Collections.emptyList();
}
阶段性总结
通过上述分析,知道了Spring Cloud集成Nacos时的关键路径,并且知道在启动时, Spring Cloud会从Nacos Server中加载动态数据保存到Environment集合。 从而实现动态配置的自动注入。
Nacos客户端的数据的加载流程
配置数据的最终加载,是基于configService.getConfig,Nacos提供的SDK来实现的。
public String getConfig(String dataId, String group, long timeoutMs) throws
NacosException
关于Nacos SDK的使用教程: https://nacos.io/zh-cn/docs/sdk.html
也就是说,接下来我们的源码分析,直接进入到Nacos这个范畴。
NacosConfigService.getConfig
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
NacosConfigService.getConfigInner
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
//获取group,如果为空,则为default-group
group = this.blank2defaultGroup(group);
//校验参数
ParamUtils.checkKeyParam(dataId, group);
//设置响应结果
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
//使用本地缓存
String content = LocalConfigInfoProcessor.getFailover(this.agent.getName(), dataId, group, tenant);
//如果本地缓存中的内容不为空
String encryptedDataKey;
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
cr.setContent(content); //把内容设置到cr中。
//获取容灾配置的encryptedDataKey
encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey); //保存到cr
this.configFilterChainManager.doFilter((IConfigRequest)null, cr); //执行过滤(目前好像没有实现)
content = cr.getContent();//返回文件content
return content;
} else {
try {
//如果本地文件中不存在相关内容,则发起远程调用
ConfigResponse response = this.worker.getServerConfig(dataId, group, tenant, timeoutMs);
//把响应内容返回
cr.setContent(response.getContent());
cr.setEncryptedDataKey(response.getEncryptedDataKey());
this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
content = cr.getContent();
return content;
} catch (NacosException var9) {
if (403 == var9.getErrCode()) {
throw var9;
} else {
//如果出现NacosException,且不是403异常,则尝试通过本地的快照文件去获取配置进行返回。
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", new Object[]{this.agent.getName(), dataId, group, tenant, var9.toString()});
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", new Object[]{this.agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)});
content = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), dataId, group, tenant);
cr.setContent(content);
encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(this.agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
this.configFilterChainManager.doFilter((IConfigRequest)null, cr);
content = cr.getContent();
return content;
}
}
}
}
getConfigInner方法,主要有几个逻辑
- 先从本地磁盘中加载配置,因为应用在启动时,会加载远程配置缓存到本地,如果本地文件的内容
不为空,直接返回。
- 如果本地文件的内容为空,则调用worker.getServerConfig加载远程配置
- 如果出现异常,则调用本地快照文件加载配置
本地文件缓存读取配置
默认情况下,nacos先从本地缓存的配置中读取文件:
C:\Users\mayn\nacos\config\fixed-192.168.8.133_8848-6a382560-ed4c-414c-
a5e2-9d72c48f1a0e_nacos
如果本地缓存内容存在,则返回内容数据,否则返回空值。
public static String getFailover(String serverName, String dataId, String group, String tenant) {
File localPath = getFailoverFile(serverName, dataId, group, tenant);
if (localPath.exists() && localPath.isFile()) {
try {
return readFile(localPath);
} catch (IOException var6) {
LOGGER.error("[" + serverName + "] get failover error, " + localPath, var6);
return null;
}
} else {
return null;
}
}
从指定文件目录下读取文件内容。
static File getFailoverFile(String serverName, String dataId, String group, String tenant) {
File tmp = new File(LOCAL_SNAPSHOT_PATH, serverName + "_nacos");
tmp = new File(tmp, "data");
if (StringUtils.isBlank(tenant)) {
tmp = new File(tmp, "config-data");
} else {
tmp = new File(tmp, "config-data-tenant");
tmp = new File(tmp, tenant);
}
return new File(new File(tmp, group), dataId);
}
ClientWorker.getServerConfig
ClientWorker,表示客户端的一个工作类,它负责和服务端交互
通过agent.httpGet发起http请求,获取远程服务的配置。
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
ConfigResponse configResponse = new ConfigResponse();
if (StringUtils.isBlank(group)) {//如果group为空,则返回默认group
group = "DEFAULT_GROUP";
}
HttpRestResult result = null;
String encryptedDataKey;
try {
//构建参数
Map<String, String> params = new HashMap(3);
if (StringUtils.isBlank(tenant)) {
params.put("dataId", dataId);
params.put("group", group);
} else {
params.put("dataId", dataId);
params.put("group", group);
params.put("tenant", tenant);
}
//发起远程调用
result = this.agent.httpGet("/v1/cs/configs", (Map)null, params, this.agent.getEncode(), readTimeout);
} catch (Exception var10) {
encryptedDataKey = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", this.agent.getName(), dataId, group, tenant);
LOGGER.error(encryptedDataKey, var10);
throw new NacosException(500, var10);
}
//根据响应结果实现不同的处理
switch(result.getCode()) {
case 200: //如果响应成功,保存快照到本地,并返回响应内容
LocalConfigInfoProcessor.saveSnapshot(this.agent.getName(), dataId, group, tenant, (String)result.getData());
configResponse.setContent((String)result.getData());
String configType;
//配置文件的类型,如text、json、yaml等
if (result.getHeader().getValue("Config-Type") != null) {
configType = result.getHeader().getValue("Config-Type");
} else {
configType = ConfigType.TEXT.getType();
}
//设置到configResponse中,后续要根据文件类型实现不同解析策略
//获取加密数据的key
configResponse.setConfigType(configType);
encryptedDataKey = result.getHeader().getValue("Encrypted-Data-Key");
//保存
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(this.agent.getName(), dataId, group, tenant, encryptedDataKey);
configResponse.setEncryptedDataKey(encryptedDataKey);
return configResponse;
case 403: //如果返回404, 清空本地快照
LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", new Object[]{this.agent.getName(), dataId, group, tenant});
throw new NacosException(result.getCode(), result.getMessage());
case 404:
LocalConfigInfoProcessor.saveSnapshot(this.agent.getName(), dataId, group, tenant, (String)null);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(this.agent.getName(), dataId, group, tenant, (String)null);
return configResponse;
case 409:
LOGGER.error("[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}", new Object[]{this.agent.getName(), dataId, group, tenant});
throw new NacosException(409, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
default:
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", new Object[]{this.agent.getName(), dataId, group, tenant, result.getCode()});
throw new NacosException(result.getCode(), "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
在NacosConfigService的构造方法中,当这个类被实例化以后,有做一些事情
- 初始化一个HttpAgent,这里又用到了装饰起模式,实际工作的类是ServerHttpAgent,
MetricsHttpAgent内部也是调用了ServerHttpAgent的方法,增加了监控统计的信息
- ClientWorker, 客户端的一个工作类,agent作为参数传入到clientworker,可以基本猜测到里面
会用到agent做一些远程相关的事情
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
//agent远程通信代理
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
MetricsHttpAgent.httpGet
�发起远程调用实现
public HttpRestResult<String> httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, String encode, long readTimeoutMs) throws Exception {
Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA");
HttpRestResult result;
try {
result = this.httpAgent.httpGet(path, headers, paramValues, encode, readTimeoutMs);
} catch (IOException var13) {
throw var13;
} finally {
timer.observeDuration();
}
return result;
}
ServerHttpAgent.httpGet
public HttpRestResult<String> httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, String encode, long readTimeoutMs) throws Exception {
long endTime = System.currentTimeMillis() + readTimeoutMs;
//注入安全信息
this.injectSecurityInfo(paramValues);
//获取当前服务器地址
String currentServerAddr = this.serverListMgr.getCurrentServerAddr();
//获取最大重试次数,默认3次
int maxRetry = this.maxRetry;
//配置HttpClient的属性,默认的readTimeOut超时时间是3s
HttpClientConfig httpConfig = HttpClientConfig.builder().setReadTimeOutMillis(Long.valueOf(readTimeoutMs).intValue()).setConTimeOutMillis(ConfigHttpClientManager.getInstance().getConnectTimeoutOrDefault(100)).build();
do {
try {
//设置header
Header newHeaders = this.getSpasHeaders(paramValues, encode);
if (headers != null) {
newHeaders.addAll(headers);
}
//构建query查询条件
Query query = Query.newInstance().initParams(paramValues);
//发起http请求,http://192.168.8.133:8848/nacos/v1/cs/configs
HttpRestResult<String> result = NACOS_RESTTEMPLATE.get(this.getUrl(currentServerAddr, path), httpConfig, newHeaders, query, String.class);
if (!this.isFail(result)) { //如果请求失败,
this.serverListMgr.updateCurrentServerAddr(currentServerAddr);
return result;
}
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", this.serverListMgr.getCurrentServerAddr(), result.getCode());
} catch (ConnectException var15) {
LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}, err : {}", this.serverListMgr.getCurrentServerAddr(), var15.getMessage());
} catch (SocketTimeoutException var16) {
LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}, err : {}", this.serverListMgr.getCurrentServerAddr(), var16.getMessage());
} catch (Exception var17) {
LOGGER.error("[NACOS Exception httpGet] currentServerAddr: " + this.serverListMgr.getCurrentServerAddr(), var17);
throw var17;
}
//如果服务端列表有多个,并且当前请求失败,则尝试用下一个地址进行重试
if (this.serverListMgr.getIterator().hasNext()) {
currentServerAddr = (String)this.serverListMgr.getIterator().next();
} else {
--maxRetry; //重试次数递减
if (maxRetry < 0) {
throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached");
}
this.serverListMgr.refreshCurrentServerAddr();
}
} while(System.currentTimeMillis() <= endTime);
LOGGER.error("no available server");
throw new ConnectException("no available server");
}
Nacos Server端的配置获取
客户端向服务端加载配置,调用的接口是:/nacos/v1/cs/configs,于是,在Nacos的源码中找到该接口
定位到Nacos源码中的ConfigController.getConfig中的方法,代码如下:
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
// check tenant
ParamUtils.checkTenant(tenant);
//租户,也就是namespaceid
tenant = NamespaceUtil.processNamespaceParameter(tenant);
//检查请求参数是否为空
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
//加载配置
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
Inner.doGetConfig
�
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
//获取请求段应用名称
String requestIpApp = RequestUtil.getAppName(request);
//尝试获取当前请求配置的读锁(避免读写冲突)
int lockResult = tryConfigReadLock(groupKey);
//请求端的ip
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
//lockResult>0 ,表示CacheItem(也就是缓存的配置项)不为空,并且已经加了读锁,意味着这个缓存数据不能被删除。
//lockResult=0 ,表示cacheItem为空,不需要加读锁
//lockResult=01 , 表示加锁失败,存在冲突。
//下面这个if,就是针对这三种情况进行处理。
if (lockResult > 0) {
// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItem
FileInputStream fis = null;
try {
String md5 = Constants.NULL;
long lastModified = 0L;
//从本地缓存中,根据groupKey获取CacheItem
CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
//判断是否是beta发布,也就是测试版本
if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
isBeta = true;
}
//获取配置文件的类型
final String configType =
(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
response.setHeader("Config-Type", configType);
//返回文件类型的枚举对象
FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
String contentTypeHeader = fileTypeEnum.getContentType();
response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
//如果是测试配置
if (isBeta) {
md5 = cacheItem.getMd54Beta();
lastModified = cacheItem.getLastModifiedTs4Beta();
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
} else {
//从磁盘中获取文件,得到的是一个完整的File
file = DiskUtil.targetBetaFile(dataId, group, tenant);
}
response.setHeader("isBeta", "true");
} else {
if (StringUtils.isBlank(tag)) { //判断tag标签是否为空,tag对应的是nacos配置中心的标签选项
if (isUseTag(cacheItem, autoTag)) {
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(autoTag);
}
if (cacheItem.tagLastModifiedTs != null) {
lastModified = cacheItem.tagLastModifiedTs.get(autoTag);
}
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);
} else {
//从磁盘中获取文件,得到的是一个完整的File
file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag);
}
response.setHeader("Vipserver-Tag",
URLEncoder.encode(autoTag, StandardCharsets.UTF_8.displayName()));
} else {//直接走这个逻辑(默认不会配置tag属性)
md5 = cacheItem.getMd5(); //获取缓存的md5
lastModified = cacheItem.getLastModifiedTs(); //获取最后更新时间
if (PropertyUtil.isDirectRead()) {//判断是否是stamdalone模式且
使用的是derby数据库,如果是,则从derby数据库加载数据
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
//否则,如果是数据库或者集群模式,先从本地磁盘得到文件
file = DiskUtil.targetFile(dataId, group, tenant);
}
//如果本地磁盘文件为空,并且configInfoBase为空,则表示配置数据不存在,直接返回null
if (configInfoBase == null && fileNotExist(file)) {
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
} else {
//如果tag不为空,说明配置文件设置了tag标签
if (cacheItem.tagMd5 != null) {
md5 = cacheItem.tagMd5.get(tag);
}
if (cacheItem.tagLastModifiedTs != null) {
Long lm = cacheItem.tagLastModifiedTs.get(tag);
if (lm != null) {
lastModified = lm;
}
}
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
} else {
file = DiskUtil.targetTagFile(dataId, group, tenant, tag);
}
if (configInfoBase == null && fileNotExist(file)) {
// FIXME CacheItem
// No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
}
//把获取的数据结果设置到response中返回
response.setHeader(Constants.CONTENT_MD5, md5);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
//如果是单机模式,直接把数据写回到客户端
if (PropertyUtil.isDirectRead()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
//否则,通过trasferTo
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (PropertyUtil.isDirectRead()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel()
.transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
}
LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
/*
Otherwise, delayed cannot be used as the basis of push delay directly,
because the delayed value of active get requests is very large.
*/
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);
} finally {
//释放锁
releaseConfigReadLock(groupKey);
IoUtils.closeQuietly(fis);
}
} else if (lockResult == 0) {//缓存为空
// FIXME CacheItem No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
ConfigTraceService
.logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1,
requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
PULL_LOG.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
persistService.findConfigInfo4Beta
�从derby数据库中获取数据内容,这个就是一个基本的数据查询操作。
public ConfigInfo4Beta findConfigInfo4Beta(final String dataId, final String group, final String tenant) {
String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;
try {
return this.jt.queryForObject(
"SELECT ID,data_id,group_id,tenant_id,app_name,content,beta_ips FROM config_info_beta WHERE data_id=? AND group_id=? AND tenant_id=?",
new Object[] {dataId, group, tenantTmp}, CONFIG_INFO4BETA_ROW_MAPPER);
} catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null.
return null;
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
throw e;
}
}
DiskUtil.targetBetaFile
从磁盘目录中获取目标文件,直接根据dataId/group/tenant ,查找指定目录下的文件即可
public static File targetBetaFile(String dataId, String group, String tenant) {
File file = null;
if (StringUtils.isBlank(tenant)) {
file = new File(EnvUtil.getNacosHome(), BETA_DIR);
} else {
file = new File(EnvUtil.getNacosHome(), TENANT_BETA_DIR);
file = new File(file, tenant);
}
file = new File(file, group);
file = new File(file, dataId);
return file;
}
客户端配置的动态感知
Nacos采用长轮训机制来实现数据变更的同步,原理如下!
整体工作流程如下:
- 客户端发起长轮训请求
- 服务端收到请求以后,先比较服务端缓存中的数据是否相同,如果不通,则直接返回
- 如果相同,则通过schedule延迟29.5s之后再执行比较
- 为了保证当服务端在29.5s之内发生数据变化能够及时通知给客户端,服务端采用事件订
阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的 通知,并且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成 了一次数据的推送
- 如果 DataChangeTask 任务完成了数据的 “推送” 之后,ClientLongPolling 中的调 度任务又开始执行了怎么办呢? 很简单,只要在进行 “推送” 操作之前,先将原来等待 执行的调度任务取消掉就可以了,这样就防止了推送操作写完响应数据之后,调度任务又 去写响应数据,这时肯定会报错的。所以,在ClientLongPolling方法中,最开始的一 个步骤就是删除订阅事件
在NacosConfigService的构造方法中,当这个类被实例化以后,有做一些事情
- 初始化一个HttpAgent,这里又用到了装饰起模式,实际工作的类是ServerHttpAgent,
MetricsHttpAgent内部也是调用了ServerHttpAgent的方法,增加了监控统计的信息
- ClientWorker, 客户端的一个工作类,agent作为参数传入到clientworker,可以基本猜测到里面
会用到agent做一些远程相关的事情
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
//agent远程通信代理
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
//初始化配置
init(properties);
//初始化一个定时调度的线程池,重写了threadfactory方法
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//初始化一个定时调度的线程池,从里面的name名字来看,似乎和长轮训有关系。而这个长轮训应该是和nacos服务端的长轮训
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
//设置定时任务的执行频率,并且调用checkConfigInfo这个方法,猜测是定时去检测配置是否发生了变化
//首次执行延迟时间为1毫秒、延迟时间为10毫秒
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
初始化一个HttpAgent,这里又用到了装饰起模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也是调用了ServerHttpAgent的方法,增加了监控统计的信息可以看到 ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:
- 第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次
checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。
第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。
checkConfigInfo
这个方法主要的目的是用来检查服务端的配置信息是否发生了变化。如果有变化,则触发listener通知
cacheMap: AtomicReference
- Value是对应存储在nacos服务器上的配置文件的内容。默认情况下,每个长轮训LongPullingRunnable任务默认处理3000个监听配置集。
如果超过3000, 则需要启动多个LongPollingRunnable去执行。currentLongingTaskCount保存已启动的LongPullingRunnable任务数
public void checkConfigInfo() { //分发任务 int listenerSize = cacheMap.size(); //向上取整为批数,监听的配置数量除以3000,得到一个整数,代表长轮训任务的数量 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } //更新当前长轮训人数数量 currentLongingTaskCount = longingTaskCount; } }
LongPollingRunnable.run
初始化new LongPollingRunnable()丢给 executorService线程池来处理,所以我们可以找到LongPollingRunnable里面的run方法这个方法传递了一个taskid, tasked用来区分cacheMap中的任务批次, 保存到cacheDatas这个集合中cacheData.isUseLocalConfigInfo 这个值的变化来自于checkLocalConfig这个方法
public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // tasked用来区分cacheMap中的任务批次, 保存到cacheDatas这个集合中 for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { //通过本地文件中缓存的数据和cacheData集合中的数据进行比对,判断是否出现数据变化 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) {//这里表示数据有变化,需要通知监听器 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
checkLocalConfig
检查本地配置,这里面有三种情况
如果isUseLocalConfigInfo为false,但是本地缓存路径的文件是存在的,那么把isUseLocalConfigInfo设置为true,并且更新cacheData的内容以及文件的更新时间
- 如果isUseLocalCOnfigInfo为true,但是本地缓存文件不存在,则设置为false,不通知监听器
- isUseLocalConfigInfo为true,并且本地缓存文件也存在,但是缓存的的时间和文件的更新时间不
一致,则更新cacheData中的内容,并且isUseLocalConfigInfo设置为true
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
//没有->有
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
LOGGER.warn(
"[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
dataId, group, tenant);
return;
}
// 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
LOGGER.warn(
"[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
}
}
CacheData.checkListenerMd5
遍历用户自己添加的监听器,如果发现数据的md5值不同,则发送通知
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, wrap);
}
}
}
检查服务端配置
在LongPollingRunnable.run中,先通过本地配置的读取和检查来判断数据是否发生变化从而实现变化
的通知接着,当前的线程还需要去远程服务器上获得最新的数据,检查哪些数据发生了变化
- 通过checkUpdateDataIds获取远程服务器上数据变更的dataid
- 遍历这些变化的集合,然后调用getServerConfig从远程服务器获得对应的内容
- 更新本地的cache,设置为服务器端返回的内容
- 最后遍历cacheDatas,找到变化的数据进行通知
//从服务端获取发生变化的数据的DataID列表,保存在List<String>集合中
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//遍历有变换的groupkey,发起远程请求获得指定groupkey的内容
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
//把获取到的内容设置到CacheData中
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
//再遍历CacheData这个集合,找到发生变化的数据进行通知
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
//继续传递当前线程进行轮询
executorService.execute(this);
checkUpdateDataIds
- 首先从cacheDatas集合中找到isUseLocalConfigInfo为false的缓存
调用checkUpdateConfigStr
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回 NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } // cacheData 首次出现在cacheMap中&首次check更新 if (cacheData.isInitializing()) { // It updates when cacheData occours in cacheMap by first time. inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
checkUpdateConfigStr
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { Map<String, String> params = new HashMap<String, String>(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map<String, String> headers = new HashMap<String, String>(2); headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); HttpRestResult<String> result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
getServerConfig
根据dataId、group、tenant等信息,使用http请求从远程服务器上获得配置信息,读取到数据之后缓
存到本地文件中public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { String[] ct = new String[2]; if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpRestResult<String> result = null; try { Map<String, String> params = new HashMap<String, String>(3); if (StringUtils.isBlank(tenant)) { params.put("dataId", dataId); params.put("group", group); } else { params.put("dataId", dataId); params.put("group", group); params.put("tenant", tenant); } //发起远程请求 result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (Exception ex) { String message = String .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ex); throw new NacosException(NacosException.SERVER_ERROR, ex); } switch (result.getCode()) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData()); ct[0] = result.getData(); if (result.getHeader().getValue(CONFIG_TYPE) != null) { ct[1] = result.getHeader().getValue(CONFIG_TYPE); } else { ct[1] = ConfigType.TEXT.getType(); } return ct; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return ct; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.getCode(), result.getMessage()); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.getCode()); throw new NacosException(result.getCode(), "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
客户端缓存配置长轮训机制总结
整体实现的核心点就一下几个部分的配置
对本地缓存的配置做任务拆分,每一个批次是3000条
- 针对每3000条创建一个线程去执行
- 先把每一个批次的缓存和本地磁盘文件中的数据进行比较,
- 如果和本地配置不一致,则表示该缓存发生了更新,直接通知客户端监听
- 如果本地缓存和磁盘数据一致,则需要发起远程请求检查配置变化
- 先以tenent/groupId/dataId拼接成字符串,发送到服务端进行检查,返回发生了变更
- 客户端收到变更配置列表,再逐项遍历发送到服务端获取配置内容。
服务端配置更新的推送
分析完客户端之后,随着好奇心的驱使,服务端是如何处理客户端的请求的?那么同样,我们 需要思考几个问题 服务端是如何实现长轮训机制的 客户端的超时时间为什么要设置30s 客户端发起的请求地址是:/v1/cs/configs/listener,于是找到这个接口进行查看,代码 如下。
ConfigController
nacos是使用spring mvc提供的rest api。这里面会调用inner.doPollingConfig进行处理
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
//解析客户端传递过来的可能发生变化的配置项目,转化为Map集合(key=dataId,value=md5)
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
//开始执行长轮训。
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig
这个方法主要是用来做长轮训和短轮询的判断
- 如果是长轮训,直接走addLongPollingClient方法
如果是短轮询,直接比较服务端的数据,如果存在md5不一致,直接把数据返回。
/** * 轮询接口. */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException { //长轮训 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 兼容短轮询逻辑 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短轮询result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); // Befor 2.0.4 version, return value is put into header. if (versionNum < START_LONG_POLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } Loggers.AUTH.info("new content:" + newResult); //禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }
longPollingService.addLongPollingClient
从方法名字上可以推测出,这个方法应该是把客户端的长轮训请求添加到某个任务中去。
- 获得客户端传递过来的超时时间,并且进行本地计算,提前500ms返回响应,这就能解释为什么
客户端响应超时时间是29.5+了。当然如果 isFixedPolling=true 的情况下,不会提前返回响应
- 根据客户端请求过来的md5和服务器端对应的group下对应内容的md5进行比较,如果不一致,则
通过 generateResponse 将结果返回
- 如果配置文件没有发生变化,则通过 scheduler.execute 启动了一个定时任务,将客户端的长轮
询请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
//str表示超时时间,也就是timeout
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
//不允许断开的标记
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
//应用名称
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add
delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
//根据客户端请求过来的md5和服务器端对应的group下对应内容的md5进行比较,如果不一致,则通过`generateResponse`将结果返回
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
//一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext是Servlet3.0中提供的对象,调用startAsync获得AsyncContext对象之后,这个请求的响应会被延后,并释放容器分配的线程。
// 这样就可以实现长轮询的机制.
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling
接下来我们来分析一下,clientLongPolling到底做了什么操作。或者说我们可以先猜测一下应该会做什
么事情
- 这个任务要阻塞29.5s才能执行,因为立马执行没有任何意义,毕竟前面已经执行过一次了
- 如果在29.5s+之内,数据发生变化,需要提前通知。需要有一种监控机制
基于这些猜想,我们可以看看它的实现过程
从代码粗粒度来看,它的实现似乎和我们的猜想一致,在run方法中,通过scheduler.schedule实现了
一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会通过MD5Util.compareMd5
来进行计算那另外一个,当数据发生变化以后,肯定不能等到29.5s之后才通知呀,那怎么办呢?我们发现有一个
allSubs 的东西,它似乎和发布订阅有关系。那是不是有可能当前的clientLongPolling订阅了数据变化的事件呢?
class ClientLongPolling implements Runnable {
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
}
allSubs
allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列似乎和配置变更有某种关联
关系。 那么这里必须要实现的是,当用户在nacos 控制台修改了配置之后,必须要从这个订阅关系中取出关注的客户端长连接,然后把变更的结果返回。于是我们去看LongPollingService的构造方法查找订阅关系
/**
* 长轮询订阅关系
*/
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);
LongPollingService
在LongPollingService的构造方法中,使用了一个NotifyCenter订阅了一个事件,其中不难发现,如果
这个事件的实例是LocalDataChangeEvent,也就是服务端数据发生变更的时间,就会执行一个DataChangeTask 的线程。
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
//注册LocalDataChangeEvent订阅事件
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {//如果触发了LocalDataChangeEvent,则执行下面的代码
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
DataChangeTask.run
从名字可以看出来,这个是数据变化的任务,最让人兴奋的应该是,它里面有一个循环迭代器,从
allSubs里面获得ClientLongPolling,最后通过clientSub.sendResponse把数据返回到客户端。所以,这也就能够理解为何数据变化能够实时触发更新了。
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
//遍历所有订阅事件表
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();//得到ClientLongPolling
//判断当前的ClientLongPolling中,请求的key是否包含当前修改的groupKey
if (clientSub.clientMd5Map.containsKey(groupKey)) {
//如果是beta方式且betaIps不包含当前客户端ip,直接返回
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
//如果配置了tag标签且不包含当前客户端的tag,直接返回
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
}
Nacos配置中心集群原理及源码分析
Nacos集群工作原理
Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。
Nacos的数据存储分为两部分
- Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。
- 每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。
这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。
当配置发生变更时:
- Nacos会把变更的配置保存到数据库,然后再写入本地文件。
- 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件
配置变更同步入口
当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange事件。
@DeleteMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean deleteConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, //
@RequestParam("group") String group, //
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) throws NacosException {
// check tenant
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", "rm");
ParamUtils.checkParam(tag);
String clientIp = RequestUtil.getRemoteIp(request);
String srcUser = RequestUtil.getSrcUserName(request);
if (StringUtils.isBlank(tag)) {
persistService.removeConfigInfo(dataId, group, tenant, clientIp, srcUser);
} else {
persistService.removeConfigInfoTag(dataId, group, tenant, tag, clientIp, srcUser);
}
final Timestamp time = TimeUtils.getCurrentTime();
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, null, time.getTime(), clientIp,
ConfigTraceService.PERSISTENCE_EVENT_REMOVE, null);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
return true;
}
根据ConfigDataChangeEvent全局搜索,找到**AsyncNotifyService监听器
AsyncNotifyService
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
// 构建NotifySingleTask,并添加到队列中。
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) { //遍历集群中的每个节点
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
//异步执行任务 AsyncTask
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
AsyncTask
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空
NotifySingleTask task = queue.poll(); //获取task
String targetIp = task.getTargetIP(); //获取目标ip
if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
//判断目标ip的健康状态
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
//构建header
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
//通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
目标节点接收请求
数据同步的请求地址为,task.url=http://localhost:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
//
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
dumpService.dump用来实现配置的更新,代码如下
当前任务会被添加到DumpTaskMgr中管理。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
TaskManager.addTask, 先调用父类去完成任务添加。
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
super.addTask(key, newTask);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。
NacosDelayTaskExecuteEngine
TaskManager的父类是NacosDelayTaskExecuteEngine,
这个类中有一个成员属性protected final ConcurrentHashMap
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
ProcessRunnable
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
processTasks
protected void processTasks() {
//获取所有的任务
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
//获取任务处理器,这里返回的是DumpProcessor
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
//执行具体任务
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
DumpProcessor.process
读取数据库的最新数据,然后更新本地缓存和磁盘。
总结
- Environment -> PropertySource
- Spring Cloud PropertySourceLocator
- 自定义了一个外部化配置的加载实现
- Nacos的外部化配置的实现? NacosPropertySourceLocator
- Spring Cloud Nacos配置的加载
- Spring Cloud Nacos配置变更
- @RefreshScope -Spring Cloud中提供的能力, 在Spring中只提供了@Scope
- Nacos Confi core
- 客户端配置的加载
- 客户端配置的动态刷新
- 客户端配置的本地快照
- 服务端配置存储(存储到内存/数据库)
- 数据动态变更比较(客户端对请求的数据进行分片, 通过本地比较+探测+最终
- 服务端数据的获取)
- 数据同步 , 基于广播的方式发送数据变更事件
结构化思维
开放性
请你简述一下最近做的项目?
项目背景
项目采用的技术架构
项目的亮点
…
applicationContext.publishEvent(
new RefreshEvent(this, null, “Refresh Nacos config”));请你说一下你对Spring Boot的理解? [宽泛的问题]
什么是Spring Boot
Spring Boot有什么好处
Spring Boot的核心组件
请你说一下Spring Boot自动装配的好处 [具体的问题就是一个检索的关键词]
如何设计一个千万级并发的架构?