今天分享的是我们组的一个实习生写的一篇源码解析文章,小伙子实习期间在社区Nacos2.0的基础上对灰度发布的能力进行了增强,并完成了MSE Nacos2.0上从管控到内核的灰度发布能力的研发。以下是他对配置发布流程的代码解析,相信看完之后你会感叹:现在的实习生都有这个水平了吗?
说到灰度发布,就不得不提到阿里的安全生产三板斧:可监控、可灰度、可回滚。在阿里内部,对于安全生产是高度重视的,灰度可以说是发布之前的必备流程。因此,作为阿里的配置中心,Nacos同样支持了配置灰度的功能,可以通过控制台进行配置的灰度推送、回滚,从而实现安全的配置发布。一般来说,我们按照下图所示流程进行配置的安全修改。只有在小规模机器上验证配置按预期生效之后才会正式发布配置,否则就回滚灰度配置。
image.png
发布流程

配置灰度发布流程

社区Nacos的灰度是基于IP的方式进行的,用户需要在控制台,选择需要灰度的配置,然后新建灰度配置,选择灰度机器的IP进行配置推送。整个交互流程如下图所示。
image.png
IP灰度机制
具体的使用方法,如果使用的是自建的社区Nacos,可以访问http://ip:port/nacos进入控制台,在配置管理的编辑页面进行配置灰度发布,如下图。
image.png
社区Nacos控制台
如果使用的是阿里云的MSE微服务引擎,可以查看MSE配置灰度发布帮助文档了解使用方法,目前在Nacos2.0专业版上已经支持灰度功能,在MSE控制台打开Beta按钮即可,如下图所示。
image.png
MSE Beta发布

Nacos灰度原理

Nacos的灰度发布原理其实并不复杂,本质就如同下面这张流程图。
image.png
灰度原理
乍一看,这个流程好复杂,实际上定睛一看,好像也没啥。整个过程就是Client、Server和Console之间的交互。Client端监听Server上的配置,建立长连接并上报自己的客户端信息,例如IP地址。Console负责进行配置灰度的调用,将用户所需要的灰度配置请求发送到Server端。然后Server端根据用户的灰度配置请求中的IP地址,过滤与客户端的长连接,然后将灰度配置定向推送到对应IP的客户端中即可。下面笔者从长连接的建立到配置灰度,进行详细的源码分析。

长连接建立

在Nacos2.0版本之前,Nacos主要采用长轮询的方式在客户端拉取服务端的配置信息。而在Nacos2.0版本中,引入了基于gRPC的长连接模型来提升配置监听的性能,客户端和服务端会建立长连接来监听配置的变更,一旦服务端有配置变更,就会将配置信息推送到客户端中。在Nacos源码中,这一过程主要涉及到两个组件之间的交互,即com.alibaba.nacos.common.remote.client.grpc包下的GrpcSdkClient类和com.alibaba.nacos.core.remote.grpc包下的GrpcBiStreamRequestAcceptor类。然而,GrpcSdkClient中没有定义具体的连接逻辑,其主要逻辑在其父类GrpcClient中。下面这段代码就是客户端连接服务端的核心代码,位于GrpcClient的connectToServer方法。

  1. @Override
  2. public Connection connectToServer(ServerInfo serverInfo) {
  3. try {
  4. // ......
  5. int port = serverInfo.getServerPort() + rpcPortOffset();
  6. // 创建一个Grpc的Stub
  7. RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
  8. if (newChannelStubTemp != null) {
  9. // 检查服务端是否可用
  10. Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
  11. if (response == null || !(response instanceof ServerCheckResponse)) {
  12. shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
  13. return null;
  14. }
  15. // 创建一个Grpc的Stream
  16. BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
  17. .newStub(newChannelStubTemp.getChannel());
  18. // 创建连接信息,保存Grpc的连接信息,也就是长连接的一个holder
  19. GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
  20. grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
  21. // 创建stream请求同时绑定到当前连接中
  22. StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
  23. // 绑定Grpc相关连接信息
  24. grpcConn.setPayloadStreamObserver(payloadStreamObserver);
  25. grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
  26. grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
  27. // 发送一个初始化连接请求,用于上报客户端的一些信息,例如标签、客户端版本等
  28. ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
  29. conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
  30. conSetupRequest.setLabels(super.getLabels());
  31. conSetupRequest.setAbilities(super.clientAbilities);
  32. conSetupRequest.setTenant(super.getTenant());
  33. grpcConn.sendRequest(conSetupRequest);
  34. // 等待连接建立成功
  35. Thread.sleep(100L);
  36. return grpcConn;
  37. }
  38. return null;
  39. } catch (Exception e) {
  40. LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
  41. }
  42. return null;
  43. }

上面这段代码主要功能有两个,一个是与服务端建立gRPC的长连接,另一个功能主要是初始化连接,后者是实现配置灰度发布的前提。在上文中有提到,配置灰度发布的过程中,需要根据控制台的灰度配置请求中的IP信息过滤长连接,在服务端就是根据连接建立初始化时上报的信息实现的过滤。从上面的代码中可以看到,ConnectionSetupRequest作为一个初始化请求,携带着客户端版本、标签等信息,但是好像并没有携带IP地址的信息。实际上,ConnectionSetupRequest也确实没有携带IP地址信息。因为在Nacos设计中,采用Request来表明客户端的请求信息,而IP地址更像是属于连接层的信息,应该属于连接的元信息,因此并没有放在Request中进行显式的设置,而是在发送请求时自动的作为Metadata信息发送到服务端中。可以看一下com.alibaba.nacos.common.remote.client.grpc包下的GrpcConnection的sendRequest方法,该方法接收一个Request请求作为参数,将请求发送给服务端。

  1. public void sendRequest(Request request) {
  2. // 将request转换为Grpc的Payload
  3. Payload convert = GrpcUtils.convert(request);
  4. // 通过Grpc的流发送请求
  5. payloadStreamObserver.onNext(convert);
  6. }

IP地址的设置,就在com.alibaba.nacos.common.remote.client.grpc包下的GrpcUtils的convert方法中,该方法主要将一个Request转换为gRPC的Payload。

  1. /**
  2. * convert request to payload.
  3. *
  4. * @param request request.
  5. * @return payload.
  6. */
  7. public static Payload convert(Request request) {
  8. // 设置元信息
  9. Metadata newMeta = Metadata.newBuilder().setType(request.getClass().getSimpleName())
  10. .setClientIp(NetUtils.localIP()).putAllHeaders(request.getHeaders()).build();
  11. request.clearHeaders();
  12. // 转换为json
  13. String jsonString = toJson(request);
  14. Payload.Builder builder = Payload.newBuilder();
  15. // 创建Payload
  16. return builder
  17. .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
  18. .setMetadata(newMeta).build();
  19. }

可以看到,这里通过NetUtils.localIP()方法获取客户端的IP信息,并存入到Metadata中,跟随Payload一起上报给服务端。到这里,客户端这里的连接过程就暂时完成了,下面介绍一下服务端接收到连接请求的响应过程。
在服务端,主要通过GrpcBiStreamRequestAcceptor的requestBiStream方法接收客户端请求,如下所示。

  1. @Override
  2. public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
  3. StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
  4. final String connectionId = CONTEXT_KEY_CONN_ID.get();
  5. final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
  6. final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
  7. String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
  8. String clientIp = "";
  9. @Override
  10. public void onNext(Payload payload) {
  11. // 获取客户端IP
  12. clientIp = payload.getMetadata().getClientIp();
  13. traceDetailIfNecessary(payload);
  14. Object parseObj;
  15. try {
  16. parseObj = GrpcUtils.parse(payload);
  17. } catch (Throwable throwable) {
  18. Loggers.REMOTE_DIGEST
  19. .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
  20. return;
  21. }
  22. if (parseObj == null) {
  23. Loggers.REMOTE_DIGEST
  24. .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
  25. payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
  26. return;
  27. }
  28. // 处理初始化请求
  29. if (parseObj instanceof ConnectionSetupRequest) {
  30. ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
  31. Map<String, String> labels = setUpRequest.getLabels();
  32. String appName = "-";
  33. if (labels != null && labels.containsKey(Constants.APPNAME)) {
  34. appName = labels.get(Constants.APPNAME);
  35. }
  36. ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
  37. remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
  38. setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
  39. metaInfo.setTenant(setUpRequest.getTenant());
  40. // 服务端的长连接信息holder
  41. Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
  42. connection.setAbilities(setUpRequest.getAbilities());
  43. boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
  44. // 注册connection到connectionManager中
  45. if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
  46. //Not register to the connection manager if current server is over limit or server is starting.
  47. try {
  48. Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
  49. rejectSdkOnStarting ? " server is not started" : " server is over limited.");
  50. connection.request(new ConnectResetRequest(), 3000L);
  51. connection.close();
  52. } catch (Exception e) {
  53. //Do nothing.
  54. if (connectionManager.traced(clientIp)) {
  55. Loggers.REMOTE_DIGEST
  56. .warn("[{}]Send connect reset request error,error={}", connectionId, e);
  57. }
  58. }
  59. }
  60. } else if (parseObj instanceof Response) {
  61. Response response = (Response) parseObj;
  62. if (connectionManager.traced(clientIp)) {
  63. Loggers.REMOTE_DIGEST
  64. .warn("[{}]Receive response of server request ,response={}", connectionId, response);
  65. }
  66. RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
  67. connectionManager.refreshActiveTime(connectionId);
  68. } else {
  69. Loggers.REMOTE_DIGEST
  70. .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
  71. parseObj);
  72. }
  73. }
  74. // ......
  75. };
  76. return streamObserver;
  77. }

这里我们主要看onNext方法,其负责处理客户端的请求信息,即Payload信息。如果是初始化连接的请求ConnectionSetupRequest,就会记录与客户端之间的长连接信息,并注册到ConnectionManager中。ConnectionManager是服务端维护所有客户端连接信息的类,持有所有的长连接信息,后续的配置推送等都需要通过ConnectionManager获取长连接信息。可以简单看一下ConnectionManager的源码,在com.alibaba.nacos.core.remote包下,如下所示。

  1. /**
  2. * connect manager.
  3. *
  4. * @author liuzunfei
  5. * @version $Id: ConnectionManager.java, v 0.1 2020年07月13日 7:07 PM liuzunfei Exp $
  6. */
  7. @Service
  8. public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
  9. // ......
  10. Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
  11. // ......
  12. /**
  13. * register a new connect.
  14. *
  15. * @param connectionId connectionId
  16. * @param connection connection
  17. */
  18. public synchronized boolean register(String connectionId, Connection connection) {
  19. if (connection.isConnected()) {
  20. if (connections.containsKey(connectionId)) {
  21. return true;
  22. }
  23. if (!checkLimit(connection)) {
  24. return false;
  25. }
  26. if (traced(connection.getMetaInfo().clientIp)) {
  27. connection.setTraced(true);
  28. }
  29. // 注册connection
  30. connections.put(connectionId, connection);
  31. connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement();
  32. clientConnectionEventListenerRegistry.notifyClientConnected(connection);
  33. Loggers.REMOTE_DIGEST
  34. .info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
  35. connection);
  36. return true;
  37. }
  38. return false;
  39. }
  40. // ......
  41. }

可以看到,在ConnectionManager中,维护了一个Map。在调用register方法时,将Connection注册到Map中,以供后续的逻辑使用。这里有一个细节,注册到ConnectionManager中的GrpcConnection与客户端持有的GrpcConnection不是一个类。这里的GrpcConnection位于com.alibaba.nacos.core.remote.grpc包,而客户端的GrpcConnection位于com.alibaba.nacos.common.remote.client.grpc包。事实上与客户端有关的gRPC相关的类都在com.alibaba.nacos.common.remote.client.grpc。com.alibaba.nacos.core.remote.grpc则是服务端的相关实现。
到这里,长连接建立的核心流程已经介绍完了,接下来笔者将详细介绍一下配置灰度的推送过程,由于Nacos在这里使用了发布订阅模式以及异步的方法调用,理解起来可能稍微要麻烦一点。

灰度推送

在Nacos中,提供了一组OpenAPI进行配置的管理,配置灰度发布也是其中一个功能,可以在com.alibaba.nacos.config.server.controller包下的ConfigController中查看,包括了BetaConfig的发布、停止和查询,接下来笔者将会一一介绍他们的原理。

创建BetaConfig

创建BetaConfig的API代码如下,一个简单的Web的API。

  1. /**
  2. * Adds or updates non-aggregated data.
  3. *
  4. * @throws NacosException NacosException.
  5. */
  6. @PostMapping
  7. @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
  8. public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
  9. @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
  10. @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
  11. @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
  12. @RequestParam(value = "appName", required = false) String appName,
  13. @RequestParam(value = "src_user", required = false) String srcUser,
  14. @RequestParam(value = "config_tags", required = false) String configTags,
  15. @RequestParam(value = "desc", required = false) String desc,
  16. @RequestParam(value = "use", required = false) String use,
  17. @RequestParam(value = "effect", required = false) String effect,
  18. @RequestParam(value = "type", required = false) String type,
  19. @RequestParam(value = "schema", required = false) String schema) throws NacosException {
  20. final String srcIp = RequestUtil.getRemoteIp(request);
  21. final String requestIpApp = RequestUtil.getAppName(request);
  22. srcUser = RequestUtil.getSrcUserName(request);
  23. //check type
  24. if (!ConfigType.isValidType(type)) {
  25. type = ConfigType.getDefaultType().getType();
  26. }
  27. // check tenant
  28. ParamUtils.checkTenant(tenant);
  29. ParamUtils.checkParam(dataId, group, "datumId", content);
  30. ParamUtils.checkParam(tag);
  31. Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
  32. MapUtil.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
  33. MapUtil.putIfValNoNull(configAdvanceInfo, "desc", desc);
  34. MapUtil.putIfValNoNull(configAdvanceInfo, "use", use);
  35. MapUtil.putIfValNoNull(configAdvanceInfo, "effect", effect);
  36. MapUtil.putIfValNoNull(configAdvanceInfo, "type", type);
  37. MapUtil.putIfValNoNull(configAdvanceInfo, "schema", schema);
  38. ParamUtils.checkParam(configAdvanceInfo);
  39. if (AggrWhitelist.isAggrDataId(dataId)) {
  40. LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
  41. dataId, group);
  42. throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
  43. }
  44. final Timestamp time = TimeUtils.getCurrentTime();
  45. // 目标灰度机器的IP地址。
  46. String betaIps = request.getHeader("betaIps");
  47. ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
  48. configInfo.setType(type);
  49. if (StringUtils.isBlank(betaIps)) {
  50. if (StringUtils.isBlank(tag)) {
  51. persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
  52. ConfigChangePublisher
  53. .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
  54. } else {
  55. persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
  56. ConfigChangePublisher.notifyConfigChange(
  57. new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
  58. }
  59. } else {
  60. // 发布Beta 配置
  61. persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
  62. // 通知配置变更
  63. ConfigChangePublisher
  64. .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
  65. }
  66. ConfigTraceService
  67. .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
  68. ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
  69. return true;
  70. }

该方法接收一个创建配置的请求,包括配置的data-id、content等信息。从代码中可以看出,该方法是通过判断请求的Header中有无betaIps的值来确定是发布正式配置还是Beta配置的。如果betaIps的值不为空,则表明待发布的配置是一个Beta配置。而配置发布的过程,实际上就是把配置插入或者更新到数据库中。在Nacos中,正式配置和灰度配置是分别存储在不同的表中的,一旦发布就会通过ConfigChangePublisher发布一个ConfigDataChangeEvent事件,然后由订阅了该事件的监听者推送配置信息到客户端。ConfigDataChangeEvent的监听者是AsyncNotifyService类,位于com.alibaba.nacos.config.server.service.notify包下,该类主要用作执行集群之间的数据Dump操作。该类在初始化的时候,会向事件中心NotifyCenter注册一个监听者,用以监听数据变更事件并异步执行数据的Dump操作,如下所示。

  1. /**
  2. * Async notify service.
  3. *
  4. * @author Nacos
  5. */
  6. @Service
  7. public class AsyncNotifyService {
  8. private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
  9. private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();
  10. private static final int MIN_RETRY_INTERVAL = 500;
  11. private static final int INCREASE_STEPS = 1000;
  12. private static final int MAX_COUNT = 6;
  13. @Autowired
  14. private DumpService dumpService;
  15. @Autowired
  16. private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
  17. private ServerMemberManager memberManager;
  18. @Autowired
  19. public AsyncNotifyService(ServerMemberManager memberManager) {
  20. this.memberManager = memberManager;
  21. // Register ConfigDataChangeEvent to NotifyCenter.
  22. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
  23. // Register A Subscriber to subscribe ConfigDataChangeEvent.
  24. NotifyCenter.registerSubscriber(new Subscriber() {
  25. @Override
  26. public void onEvent(Event event) {
  27. // Generate ConfigDataChangeEvent concurrently
  28. if (event instanceof ConfigDataChangeEvent) {
  29. ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
  30. long dumpTs = evt.lastModifiedTs;
  31. String dataId = evt.dataId;
  32. String group = evt.group;
  33. String tenant = evt.tenant;
  34. String tag = evt.tag;
  35. Collection<Member> ipList = memberManager.allMembers();
  36. // In fact, any type of queue here can be
  37. Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
  38. Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
  39. for (Member member : ipList) {
  40. // 判断是否是长轮询
  41. if (!MemberUtil.isSupportedLongCon(member)) {
  42. // 添加一个长轮询的异步dump任务
  43. httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
  44. evt.isBeta));
  45. } else {
  46. // 添加一个长连接的异步dump任务
  47. rpcQueue.add(
  48. new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
  49. }
  50. }
  51. // 判断并执行长轮询的异步dump任务
  52. if (!httpQueue.isEmpty()) {
  53. ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
  54. }
  55. // 判断并执行长连接的异步dump任务
  56. if (!rpcQueue.isEmpty()) {
  57. ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
  58. }
  59. }
  60. }
  61. @Override
  62. public Class<? extends Event> subscribeType() {
  63. return ConfigDataChangeEvent.class;
  64. }
  65. });
  66. }
  67. }

在接收到ConfigDataChangeEvent之后,如果Nacos2.0以上的版本,会创建一个RpcTask用以执行配置变更的通知,由内部类AsyncRpcTask执行,AsyncRpcTask具体逻辑如下所示。

  1. class AsyncRpcTask implements Runnable {
  2. private Queue<NotifySingleRpcTask> queue;
  3. public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
  4. this.queue = queue;
  5. }
  6. @Override
  7. public void run() {
  8. while (!queue.isEmpty()) {
  9. NotifySingleRpcTask task = queue.poll();
  10. // 创建配置变更请求
  11. ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
  12. syncRequest.setDataId(task.getDataId());
  13. syncRequest.setGroup(task.getGroup());
  14. syncRequest.setBeta(task.isBeta);
  15. syncRequest.setLastModified(task.getLastModified());
  16. syncRequest.setTag(task.tag);
  17. syncRequest.setTenant(task.getTenant());
  18. Member member = task.member;
  19. // 如果是自身的数据变更,直接执行dump操作
  20. if (memberManager.getSelf().equals(member)) {
  21. if (syncRequest.isBeta()) {
  22. // 同步Beta配置
  23. dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
  24. syncRequest.getLastModified(), NetUtils.localIP(), true);
  25. } else {
  26. // 同步正式配置
  27. dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
  28. syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
  29. }
  30. continue;
  31. }
  32. // 通知其他服务端进行dump
  33. if (memberManager.hasMember(member.getAddress())) {
  34. // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
  35. boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
  36. if (unHealthNeedDelay) {
  37. // target ip is unhealthy, then put it in the notification list
  38. ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
  39. task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
  40. 0, member.getAddress());
  41. // get delay time and set fail count to the task
  42. asyncTaskExecute(task);
  43. } else {
  44. if (!MemberUtil.isSupportedLongCon(member)) {
  45. asyncTaskExecute(
  46. new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
  47. task.getLastModified(), member.getAddress(), task.isBeta));
  48. } else {
  49. try {
  50. configClusterRpcClientProxy
  51. .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
  52. } catch (Exception e) {
  53. MetricsMonitor.getConfigNotifyException().increment();
  54. asyncTaskExecute(task);
  55. }
  56. }
  57. }
  58. } else {
  59. //No nothig if member has offline.
  60. }
  61. }
  62. }
  63. }

这里首先创建了一个ConfigChangeClusterSyncRequest,并将配置信息写入。然后获取集群信息,通知相应的Server处理的数据同步请求。同步配置变更信息的核心逻辑由DumpService来执行。我们主要查看同步Beta配置的操作,DumpService的dump方法如下所示。

  1. /**
  2. * Add DumpTask to TaskManager, it will execute asynchronously.
  3. */
  4. public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
  5. String groupKey = GroupKey2.getKey(dataId, group, tenant);
  6. String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
  7. dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
  8. DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
  9. }

在该方法中,这里会根据配置变更信息,提交一个异步的DumpTask任务,后续会由DumpProcessor类的process方法进行处理,该方法如下所示。

  1. /**
  2. * dump processor.
  3. *
  4. * @author Nacos
  5. * @date 2020/7/5 12:19 PM
  6. */
  7. public class DumpProcessor implements NacosTaskProcessor {
  8. final DumpService dumpService;
  9. public DumpProcessor(DumpService dumpService) {
  10. this.dumpService = dumpService;
  11. }
  12. @Override
  13. public boolean process(NacosTask task) {
  14. final PersistService persistService = dumpService.getPersistService();
  15. DumpTask dumpTask = (DumpTask) task;
  16. String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
  17. String dataId = pair[0];
  18. String group = pair[1];
  19. String tenant = pair[2];
  20. long lastModified = dumpTask.getLastModified();
  21. String handleIp = dumpTask.getHandleIp();
  22. boolean isBeta = dumpTask.isBeta();
  23. String tag = dumpTask.getTag();
  24. ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
  25. .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
  26. if (isBeta) {
  27. // 更新Beta配置的缓存
  28. ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
  29. build.remove(Objects.isNull(cf));
  30. build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
  31. build.content(Objects.isNull(cf) ? null : cf.getContent());
  32. return DumpConfigHandler.configDump(build.build());
  33. }
  34. if (StringUtils.isBlank(tag)) {
  35. ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
  36. build.remove(Objects.isNull(cf));
  37. build.content(Objects.isNull(cf) ? null : cf.getContent());
  38. build.type(Objects.isNull(cf) ? null : cf.getType());
  39. } else {
  40. ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
  41. build.remove(Objects.isNull(cf));
  42. build.content(Objects.isNull(cf) ? null : cf.getContent());
  43. }
  44. return DumpConfigHandler.configDump(build.build());
  45. }
  46. }

可以看到,如果是Beta配置,则获取最新的Beta配置信息,然后触发DumpConfigHandler的configDump方法。进入configDump可以看到,该方法主要用来更新缓存的配置信息,调用ConfigCacheService的相关操作进行配置的更新。

  1. /**
  2. * Dump config subscriber.
  3. *
  4. * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
  5. */
  6. public class DumpConfigHandler extends Subscriber<ConfigDumpEvent> {
  7. /**
  8. * trigger config dump event.
  9. *
  10. * @param event {@link ConfigDumpEvent}
  11. * @return {@code true} if the config dump task success , else {@code false}
  12. */
  13. public static boolean configDump(ConfigDumpEvent event) {
  14. final String dataId = event.getDataId();
  15. final String group = event.getGroup();
  16. final String namespaceId = event.getNamespaceId();
  17. final String content = event.getContent();
  18. final String type = event.getType();
  19. final long lastModified = event.getLastModifiedTs();
  20. if (event.isBeta()) {
  21. boolean result = false;
  22. // 删除操作
  23. if (event.isRemove()) {
  24. result = ConfigCacheService.removeBeta(dataId, group, namespaceId);
  25. if (result) {
  26. ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
  27. ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
  28. }
  29. return result;
  30. } else {
  31. // 更新或者发布
  32. result = ConfigCacheService
  33. .dumpBeta(dataId, group, namespaceId, content, lastModified, event.getBetaIps());
  34. if (result) {
  35. ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
  36. ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
  37. content.length());
  38. }
  39. }
  40. return result;
  41. }
  42. // ......
  43. }
  44. @Override
  45. public void onEvent(ConfigDumpEvent event) {
  46. configDump(event);
  47. }
  48. @Override
  49. public Class<? extends Event> subscribeType() {
  50. return ConfigDumpEvent.class;
  51. }
  52. }

在ConfigCacheService中,会对比配置信息,如果配置有变化,则发布事件LocalDataChangeEvent,触发RpcConfigChangeNotifier的configDataChanged方法来推送配置,configDataChanged方法代码如下。

  1. /**
  2. * ConfigChangeNotifier.
  3. *
  4. * @author liuzunfei
  5. * @version $Id: ConfigChangeNotifier.java, v 0.1 2020年07月20日 3:00 PM liuzunfei Exp $
  6. */
  7. @Component(value = "rpcConfigChangeNotifier")
  8. public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
  9. // ......
  10. @Autowired
  11. ConfigChangeListenContext configChangeListenContext;
  12. @Autowired
  13. private RpcPushService rpcPushService;
  14. @Autowired
  15. private ConnectionManager connectionManager;
  16. /**
  17. * adaptor to config module ,when server side config change ,invoke this method.
  18. *
  19. * @param groupKey groupKey
  20. */
  21. public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
  22. List<String> betaIps, String tag) {
  23. // 获取配置的所有监听者
  24. Set<String> listeners = configChangeListenContext.getListeners(groupKey);
  25. if (CollectionUtils.isEmpty(listeners)) {
  26. return;
  27. }
  28. int notifyClientCount = 0;
  29. // 遍历所有监听者
  30. for (final String client : listeners) {
  31. // 获取长连接信息
  32. Connection connection = connectionManager.getConnection(client);
  33. if (connection == null) {
  34. continue;
  35. }
  36. String clientIp = connection.getMetaInfo().getClientIp();
  37. String clientTag = connection.getMetaInfo().getTag();
  38. // 判断是否是Beta的Ip
  39. if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
  40. continue;
  41. }
  42. // tag check
  43. if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
  44. continue;
  45. }
  46. // 配置变更推送请求
  47. ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
  48. // 执行推送任务
  49. RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
  50. connection.getMetaInfo().getAppName());
  51. push(rpcPushRetryTask);
  52. notifyClientCount++;
  53. }
  54. Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
  55. }
  56. @Override
  57. public void onEvent(LocalDataChangeEvent event) {
  58. String groupKey = event.groupKey;
  59. boolean isBeta = event.isBeta;
  60. List<String> betaIps = event.betaIps;
  61. String[] strings = GroupKey.parseKey(groupKey);
  62. String dataId = strings[0];
  63. String group = strings[1];
  64. String tenant = strings.length > 2 ? strings[2] : "";
  65. String tag = event.tag;
  66. configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
  67. }
  68. // ......
  69. }

到这里,基本上就是配置变更推送的最后一个步骤了,如代码中注释所示,通过调用ConnectionManager的getConnection方法,遍历所有监听者的连接,根据其中的Meta信息判断是否是Beta推送的目标,然后执行推送任务,也就是执行push方法,如下所示。

  1. private void push(RpcPushTask retryTask) {
  2. ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
  3. // 判断是否重试次数达到限制
  4. if (retryTask.isOverTimes()) {
  5. Loggers.REMOTE_PUSH
  6. .warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.",
  7. notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(),
  8. retryTask.connectionId);
  9. // 主动注销连接
  10. connectionManager.unregister(retryTask.connectionId);
  11. } else if (connectionManager.getConnection(retryTask.connectionId) != null) {
  12. // first time :delay 0s; sencond time:delay 2s ;third time :delay 4s
  13. // 尝试执行配置推送
  14. ConfigExecutor.getClientConfigNotifierServiceExecutor()
  15. .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
  16. } else {
  17. // client is already offline,ingnore task.
  18. }
  19. }

这里实际上也是一个异步执行的过程,推送任务RpcPushTask会被提交到ClientConfigNotifierServiceExecutor来计划执行,第一次会立即推送配置,即调用RpcPushTask的run方法,如果失败则延迟重试次数x2的秒数再次执行,直到超过重试次数,主动注销当前连接。其中,RpcPushTask的定义如下。

  1. class RpcPushTask implements Runnable {
  2. ConfigChangeNotifyRequest notifyRequest;
  3. int maxRetryTimes = -1;
  4. int tryTimes = 0;
  5. String connectionId;
  6. String clientIp;
  7. String appName;
  8. public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId,
  9. String clientIp, String appName) {
  10. this.notifyRequest = notifyRequest;
  11. this.maxRetryTimes = maxRetryTimes;
  12. this.connectionId = connectionId;
  13. this.clientIp = clientIp;
  14. this.appName = appName;
  15. }
  16. public boolean isOverTimes() {
  17. return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes;
  18. }
  19. @Override
  20. public void run() {
  21. tryTimes++;
  22. if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
  23. push(this);
  24. } else {
  25. // 推送配置
  26. rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
  27. @Override
  28. public void onSuccess() {
  29. tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
  30. }
  31. @Override
  32. public void onFail(Throwable e) {
  33. tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
  34. Loggers.REMOTE_PUSH.warn("Push fail", e);
  35. push(RpcPushTask.this);
  36. }
  37. }, ConfigExecutor.getClientConfigNotifierServiceExecutor());
  38. }
  39. }
  40. }

可以看到,在RpcPushTask的run方法中,调用了RpcPushService的pushWithCallback方法,如下所示。

  1. /**
  2. * push response to clients.
  3. *
  4. * @author liuzunfei
  5. * @version $Id: PushService.java, v 0.1 2020年07月20日 1:12 PM liuzunfei Exp $
  6. */
  7. @Service
  8. public class RpcPushService {
  9. @Autowired
  10. private ConnectionManager connectionManager;
  11. /**
  12. * push response with no ack.
  13. *
  14. * @param connectionId connectionId.
  15. * @param request request.
  16. * @param requestCallBack requestCallBack.
  17. */
  18. public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
  19. Executor executor) {
  20. Connection connection = connectionManager.getConnection(connectionId);
  21. if (connection != null) {
  22. try {
  23. // 执行配置推送
  24. connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
  25. @Override
  26. public Executor getExecutor() {
  27. return executor;
  28. }
  29. @Override
  30. public void onResponse(Response response) {
  31. if (response.isSuccess()) {
  32. requestCallBack.onSuccess();
  33. } else {
  34. requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
  35. }
  36. }
  37. @Override
  38. public void onException(Throwable e) {
  39. requestCallBack.onFail(e);
  40. }
  41. });
  42. } catch (ConnectionAlreadyClosedException e) {
  43. connectionManager.unregister(connectionId);
  44. requestCallBack.onSuccess();
  45. } catch (Exception e) {
  46. Loggers.REMOTE_DIGEST
  47. .error("error to send push response to connectionId ={},push response={}", connectionId,
  48. request, e);
  49. requestCallBack.onFail(e);
  50. }
  51. } else {
  52. requestCallBack.onSuccess();
  53. }
  54. }
  55. }

其持有ConnectionManager对象,当需要推送配置到客户端时,会获取相应的Connection,然后执行asyncRequest将配置推送到客户端中。如果连接已经关闭,则注销连接。在asyncRequest底层即是调用Grpc建立的Stream的onNext方法,将配置推送给客户端,如下。

  1. /**
  2. * grpc connection.
  3. *
  4. * @author liuzunfei
  5. * @version $Id: GrpcConnection.java, v 0.1 2020年07月13日 7:26 PM liuzunfei Exp $
  6. */
  7. public class GrpcConnection extends Connection {
  8. private StreamObserver streamObserver;
  9. private Channel channel;
  10. public GrpcConnection(ConnectionMeta metaInfo, StreamObserver streamObserver, Channel channel) {
  11. super(metaInfo);
  12. this.streamObserver = streamObserver;
  13. this.channel = channel;
  14. }
  15. @Override
  16. public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
  17. sendRequestInner(request, requestCallBack);
  18. }
  19. private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException {
  20. final String requestId = String.valueOf(PushAckIdGenerator.getNextId());
  21. request.setRequestId(requestId);
  22. DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId,
  23. callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId));
  24. RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture);
  25. sendRequestNoAck(request);
  26. return defaultPushFuture;
  27. }
  28. private void sendRequestNoAck(Request request) throws NacosException {
  29. try {
  30. //StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
  31. synchronized (streamObserver) {
  32. Payload payload = GrpcUtils.convert(request);
  33. traceIfNecessary(payload);
  34. streamObserver.onNext(payload);
  35. }
  36. } catch (Exception e) {
  37. if (e instanceof StatusRuntimeException) {
  38. throw new ConnectionAlreadyClosedException(e);
  39. }
  40. throw e;
  41. }
  42. }
  43. }

主要推送逻辑的代码如上所示,调用asyncRequest之后,会将请求交给sendRequestInner处理,sendRequestInner又会调用sendRequestNoAck将推送请求推入gRPC的流中,客户端收到配置更新的请求,就会更新客户端的配置了。至此,一个灰度配置就发布成功了。

删除/查询BetaConfig

删除和查询BetaConfig的方法都很简单,都是简单的操作数据库即可。如果是删除配置,则会触发ConfigDataChangeEvent来告知客户端更新配置,这里笔者就不多加赘述了。

  1. /**
  2. * Execute to remove beta operation.
  3. *
  4. * @param dataId dataId string value.
  5. * @param group group string value.
  6. * @param tenant tenant string value.
  7. * @return Execute to operate result.
  8. */
  9. @DeleteMapping(params = "beta=true")
  10. @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
  11. public RestResult<Boolean> stopBeta(@RequestParam(value = "dataId") String dataId,
  12. @RequestParam(value = "group") String group,
  13. @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) {
  14. try {
  15. persistService.removeConfigInfo4Beta(dataId, group, tenant);
  16. } catch (Throwable e) {
  17. LOGGER.error("remove beta data error", e);
  18. return RestResultUtils.failed(500, false, "remove beta data error");
  19. }
  20. ConfigChangePublisher
  21. .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, System.currentTimeMillis()));
  22. return RestResultUtils.success("stop beta ok", true);
  23. }
  24. /**
  25. * Execute to query beta operation.
  26. *
  27. * @param dataId dataId string value.
  28. * @param group group string value.
  29. * @param tenant tenant string value.
  30. * @return RestResult for ConfigInfo4Beta.
  31. */
  32. @GetMapping(params = "beta=true")
  33. @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
  34. public RestResult<ConfigInfo4Beta> queryBeta(@RequestParam(value = "dataId") String dataId,
  35. @RequestParam(value = "group") String group,
  36. @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) {
  37. try {
  38. ConfigInfo4Beta ci = persistService.findConfigInfo4Beta(dataId, group, tenant);
  39. return RestResultUtils.success("stop beta ok", ci);
  40. } catch (Throwable e) {
  41. LOGGER.error("remove beta data error", e);
  42. return RestResultUtils.failed("remove beta data error");
  43. }
  44. }

总结

Nacos2.0使用长连接代替了短连接的长轮询,性能几乎提升了10倍。在阿里内部,也在逐渐推进Nacos2作为统一的配置中心。目前在微服务引擎(Micro Service Engine,简称 MSE),Nacos作为注册配置中心,提供了纯托管的服务,只需要购买Nacos专业版即可享受到10倍的性能提升。
此外,MSE微服务引擎顾名思义,是一个面向业界主流开源微服务生态的一站式微服务平台, 帮助微服务用户更稳定、更便捷、更低成本的使用开源微服务技术构建微服务体系。不但提供注册中心、配置中心全托管(兼容 Nacos/ZooKeeper/Eureka),而且提供网关(兼容 Ingress/Enovy)和无侵入的开源增强服务治理能力。
在阿里,MSE微服务引擎已经被大规模的接入使用,经历阿里内部生产考验以及反复淬炼,其中微服务服务治理能力支撑了大量的微服务系统,对包括Spring Cloud、Dubbo等微服务框架的治理功能增强,提供了无损上下线、金丝雀发布、离群摘除以及无损滚动升级的功能。
如果有快速搭建高性能微服务以及大规模服务治理的需求,相比于从零搭建和运维,MSE微服务引擎是一个不错的选择。