首先搭建Nacos源码环境,源码环境搭建起来比较轻松,几
乎不会报什么错误

客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使
用最多的就是服务注册和服务发现在Nacos源码中 nacos-example 中com.alibaba.nacos.example.NamingExample 类分别演示了这4个功能的操作,可以把它当做入口,代码如下:
public class NamingExample {public static void main(String[] args) throws NacosException {Properties properties = new Properties();properties.setProperty("serverAddr", System.getProperty("serverAddr"));properties.setProperty("namespace", System.getProperty("namespace"));NamingService naming = NamingFactory.createNamingService(properties);//服务注册naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "DEFAULT");naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");//服务发现System.out.println(naming.getAllInstances("nacos.test.3"));//服务下线naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");System.out.println(naming.getAllInstances("nacos.test.3"));//服务订阅Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("test-thread");return thread;}});naming.subscribe("nacos.test.3", new AbstractEventListener() {//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.//So you can override getExecutor() to async handle event.@Overridepublic Executor getExecutor() {return executor;}@Overridepublic void onEvent(Event event) {System.out.println(((NamingEvent) event).getServiceName());System.out.println(((NamingEvent) event).getInstances());}});}}
客户端工作流程
服务注册
沿着案例中的服务注册方法调用找到 nacos-api 中的 NamingService.registerInstance() 并找 到它的实现类和方法 com.alibaba.nacos.client.naming.NacosNamingService ,代码如下:
NacosNamingService.registerInstance
/**** 服务注册* @param serviceName 服务名字* @param ip 服务IP* @param port 服务端口* @param clusterName 集群名字* @throws NacosException*/@Overridepublic void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {//调用重载方法registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);}
@Overridepublic void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)throws NacosException {//设置实例IP:Port,默认权重为1.0Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setWeight(1.0);instance.setClusterName(clusterName);registerInstance(serviceName, groupName, instance);}
/*** 实例注册*/@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//该字段表示注册的实例是否是临时实例还是持久化实例。// 如果是临时实例,则不会在 Nacos 服务端持久化存储,需要通过上报心跳的方式进行包活,// 如果一段时间内没有上报心跳,则会被 Nacos 服务端摘除。if (instance.isEphemeral()) {//为注册服务设置一个定时任务获取心跳信息,默认为5s汇报一次BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//服务注册serverProxy.registerService(groupedServiceName, groupName, instance);}
注册主要做了两件事,第一件事:为注册的服务设置一个定时任务,定时拉去服务信息。 第二件事:将
服务注册到服务端。
1:启动一个定时任务,定时拉取服务信息,时间间隔为5s,如果拉下来服务正常,不做处理,如果不正常,重新注册2:发送http请求给注册中心服务端,调用服务注册接口,注册服务
上面代码我们可以看到定时任务添加,但并未完全看到远程请求, serverProxy.registerService()方法如下,会先封装请求参数,接下来调用 reqApi() 而 reqApi() 最后会调用 callServer() ,代码如下:
NamingProxy.registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);//参数封装-final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));//执行http请求reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}
NamingProxy.reqApi
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {//调用重载方法return reqApi(api, params, Collections.EMPTY_MAP, method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)throws NacosException {return reqApi(api, params, body, getServerList(), method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");}NacosException exception = new NacosException();if (StringUtils.isNotBlank(nacosDomain)) {for (int i = 0; i < maxRetry; i++) {try {return callServer(api, params, body, nacosDomain, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);}}}} else {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {//执行远程调用return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}index = (index + 1) % servers.size();}}NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),exception.getErrMsg());throw new NacosException(exception.getErrCode(),"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());}
NamingProxy.callServer
/*** 执行远程调用** @param api api* @param params parameters* @param body body* @param curServer ?* @param method http method* @return result* @throws NacosException nacos exception*/public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,String method) throws NacosException {long start = System.currentTimeMillis();long end = 0;injectSecurityInfo(params);//封装请求头Header header = builderHeader();//请求头是http还是https协议String url;if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {url = curServer + api;} else {if (!IPUtil.containsPort(curServer)) {curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;}url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;}try {//执行远程请求,并获取结果集HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);end = System.currentTimeMillis();MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);//结果解析if (restResult.ok()) {return restResult.getData();}if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {return StringUtils.EMPTY;}throw new NacosException(restResult.getCode(), restResult.getMessage());} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to request", e);throw new NacosException(NacosException.SERVER_ERROR, e);}}
服务发现
案例中的服务发现方法调用找到 nacos-api 中的 NamingService.getAllInstances() 并找到它的实现类和方法
