首先搭建Nacos源码环境,源码环境搭建起来比较轻松,几 乎不会报什么错误
image.pngimage.png
客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使
用最多的就是服务注册和服务发现在Nacos源码中 nacos-example 中com.alibaba.nacos.example.NamingExample 类分别演示了这4个功能的操作,可以把它当做入口,代码如下:

  1. public class NamingExample {
  2. public static void main(String[] args) throws NacosException {
  3. Properties properties = new Properties();
  4. properties.setProperty("serverAddr", System.getProperty("serverAddr"));
  5. properties.setProperty("namespace", System.getProperty("namespace"));
  6. NamingService naming = NamingFactory.createNamingService(properties);
  7. //服务注册
  8. naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "DEFAULT");
  9. naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
  10. //服务发现
  11. System.out.println(naming.getAllInstances("nacos.test.3"));
  12. //服务下线
  13. naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
  14. System.out.println(naming.getAllInstances("nacos.test.3"));
  15. //服务订阅
  16. Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
  17. new ThreadFactory() {
  18. @Override
  19. public Thread newThread(Runnable r) {
  20. Thread thread = new Thread(r);
  21. thread.setName("test-thread");
  22. return thread;
  23. }
  24. });
  25. naming.subscribe("nacos.test.3", new AbstractEventListener() {
  26. //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
  27. //So you can override getExecutor() to async handle event.
  28. @Override
  29. public Executor getExecutor() {
  30. return executor;
  31. }
  32. @Override
  33. public void onEvent(Event event) {
  34. System.out.println(((NamingEvent) event).getServiceName());
  35. System.out.println(((NamingEvent) event).getInstances());
  36. }
  37. });
  38. }
  39. }

客户端工作流程

服务注册

沿着案例中的服务注册方法调用找到 nacos-api 中的 NamingService.registerInstance() 并找 到它的实现类和方法 com.alibaba.nacos.client.naming.NacosNamingService ,代码如下:

NacosNamingService.registerInstance

  1. /***
  2. * 服务注册
  3. * @param serviceName 服务名字
  4. * @param ip 服务IP
  5. * @param port 服务端口
  6. * @param clusterName 集群名字
  7. * @throws NacosException
  8. */
  9. @Override
  10. public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {
  11. //调用重载方法
  12. registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);
  13. }
  1. @Override
  2. public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
  3. throws NacosException {
  4. //设置实例IP:Port,默认权重为1.0
  5. Instance instance = new Instance();
  6. instance.setIp(ip);
  7. instance.setPort(port);
  8. instance.setWeight(1.0);
  9. instance.setClusterName(clusterName);
  10. registerInstance(serviceName, groupName, instance);
  11. }
  1. /**
  2. * 实例注册
  3. */
  4. @Override
  5. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  6. NamingUtils.checkInstanceIsLegal(instance);
  7. String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  8. //该字段表示注册的实例是否是临时实例还是持久化实例。
  9. // 如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,
  10. // 如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。
  11. if (instance.isEphemeral()) {
  12. //为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次
  13. BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
  14. beatReactor.addBeatInfo(groupedServiceName, beatInfo);
  15. }
  16. //服务注册
  17. serverProxy.registerService(groupedServiceName, groupName, instance);
  18. }

注册主要做了两件事,第一件事:为注册的服务设置一个定时任务,定时拉去服务信息。 第二件事:将
服务注册到服务端。

  1. 1:启动一个定时任务,定时拉取服务信息,时间间隔为5s,如果拉下来服务正常,不做处理,如果不正常,
  2. 重新注册
  3. 2:发送http请求给注册中心服务端,调用服务注册接口,注册服务

上面代码我们可以看到定时任务添加,但并未完全看到远程请求, serverProxy.registerService()方法如下,会先封装请求参数,接下来调用 reqApi() 而 reqApi() 最后会调用 callServer() ,代码如下:

NamingProxy.registerService

  1. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  2. NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
  3. instance);
  4. //参数封装-
  5. final Map<String, String> params = new HashMap<String, String>(16);
  6. params.put(CommonParams.NAMESPACE_ID, namespaceId);
  7. params.put(CommonParams.SERVICE_NAME, serviceName);
  8. params.put(CommonParams.GROUP_NAME, groupName);
  9. params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
  10. params.put("ip", instance.getIp());
  11. params.put("port", String.valueOf(instance.getPort()));
  12. params.put("weight", String.valueOf(instance.getWeight()));
  13. params.put("enable", String.valueOf(instance.isEnabled()));
  14. params.put("healthy", String.valueOf(instance.isHealthy()));
  15. params.put("ephemeral", String.valueOf(instance.isEphemeral()));
  16. params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
  17. //执行http请求
  18. reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
  19. }

NamingProxy.reqApi

  1. public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
  2. //调用重载方法
  3. return reqApi(api, params, Collections.EMPTY_MAP, method);
  4. }
  5. public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
  6. throws NacosException {
  7. return reqApi(api, params, body, getServerList(), method);
  8. }
  9. public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
  10. String method) throws NacosException {
  11. params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
  12. if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
  13. throw new NacosException(NacosException.INVALID_PARAM, "no server available");
  14. }
  15. NacosException exception = new NacosException();
  16. if (StringUtils.isNotBlank(nacosDomain)) {
  17. for (int i = 0; i < maxRetry; i++) {
  18. try {
  19. return callServer(api, params, body, nacosDomain, method);
  20. } catch (NacosException e) {
  21. exception = e;
  22. if (NAMING_LOGGER.isDebugEnabled()) {
  23. NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
  24. }
  25. }
  26. }
  27. } else {
  28. Random random = new Random(System.currentTimeMillis());
  29. int index = random.nextInt(servers.size());
  30. for (int i = 0; i < servers.size(); i++) {
  31. String server = servers.get(index);
  32. try {
  33. //执行远程调用
  34. return callServer(api, params, body, server, method);
  35. } catch (NacosException e) {
  36. exception = e;
  37. if (NAMING_LOGGER.isDebugEnabled()) {
  38. NAMING_LOGGER.debug("request {} failed.", server, e);
  39. }
  40. }
  41. index = (index + 1) % servers.size();
  42. }
  43. }
  44. NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
  45. exception.getErrMsg());
  46. throw new NacosException(exception.getErrCode(),
  47. "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
  48. }

NamingProxy.callServer

  1. /**
  2. * 执行远程调用
  3. *
  4. * @param api api
  5. * @param params parameters
  6. * @param body body
  7. * @param curServer ?
  8. * @param method http method
  9. * @return result
  10. * @throws NacosException nacos exception
  11. */
  12. public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
  13. String method) throws NacosException {
  14. long start = System.currentTimeMillis();
  15. long end = 0;
  16. injectSecurityInfo(params);
  17. //封装请求头
  18. Header header = builderHeader();
  19. //请求头是http还是https协议
  20. String url;
  21. if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
  22. url = curServer + api;
  23. } else {
  24. if (!IPUtil.containsPort(curServer)) {
  25. curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
  26. }
  27. url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
  28. }
  29. try {
  30. //执行远程请求,并获取结果集
  31. HttpRestResult<String> restResult = nacosRestTemplate
  32. .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
  33. end = System.currentTimeMillis();
  34. MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
  35. .observe(end - start);
  36. //结果解析
  37. if (restResult.ok()) {
  38. return restResult.getData();
  39. }
  40. if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
  41. return StringUtils.EMPTY;
  42. }
  43. throw new NacosException(restResult.getCode(), restResult.getMessage());
  44. } catch (Exception e) {
  45. NAMING_LOGGER.error("[NA] failed to request", e);
  46. throw new NacosException(NacosException.SERVER_ERROR, e);
  47. }
  48. }

服务发现

案例中的服务发现方法调用找到 nacos-api 中的 NamingService.getAllInstances() 并找到它的实现类和方法

NamingService.getAllInstances