:::info 开源示例参考:https://github.com/jboner/lagom-service-locator-consul :::
添加consul依赖
在build.sbt文件中,添加consul的相关依赖,GitHub示例中的时间较早,版本API已经废弃,本例中使用较新的版本,并调整了API的新用法。
val consulApi = "com.ecwid.consul" % "consul-api" % "1.4.5"
Consul组件
ConsulConfig
用于读取配置文件中,配置的consul中心地址,希望将自身服务注册到哪个consul服务,需要配置必要的信息,包括Consul服务的IP地址和服务端口,uri方式和路由策略。代码如下:
trait ConsulConfig {def agentHostname: Stringdef agentPort: Intdef scheme: Stringdef routingPolicy: RoutingPolicy}object ConsulConfig {class ConsulConfigImpl (config: Configuration) extends ConsulConfig {override val agentHostname = config.get[String]("server.discovery.consul.agent-hostname")override val agentPort = config.get[Int]("server.discovery.consul.agent-port")override val scheme = config.get[String]("server.discovery.consul.uri-scheme")override val routingPolicy = RoutingPolicy(config.get[String]("server.discovery.consul.routing-policy"))}}
对应的需要在resource/application.conf文件中,添加对应的服务信息,配置信息如下:
##服务发现使用consul机制,用于表示consul集群地址信息lagom {discovery {consul {agent-hostname = "127.0.0.1" # hostname or IP-address for the Consul agentagent-port = 8500 # port for the Consul agenturi-scheme = "http" # for example: http or httpsrouting-policy = "round-robin" # valid routing policies: first, random, round-robin}}}
路由策略
一般在consul服务中心注册的业务服务一般由多个实例构成的一个集群服务,对外统一暴露一个服务名称,但可能存在多个不同的服务实例节点,来响应服务请求。对于一个服务的不同实例的选取过程称为路由,路由策略有多种,不同的路由策略来不同程度的来保障不同实例之间的负载均衡,这里简单实现以下三种:
object RoutingPolicy {def apply(policy: String): RoutingPolicy = policy match {case "first" => Firstcase "random" => Randomcase "round-robin" => RoundRobincase unknown => throw new IllegalArgumentException(s"discovery.consul.routing-policy, [${unknown}] is not a valid routing algorithm")}}sealed trait RoutingPolicycase object First extends RoutingPolicycase object Random extends RoutingPolicycase object RoundRobin extends RoutingPolicy
Consul组件定义
主要能力定义:
- 将当前的服务注册到Consul服务中心
- 将当前服务从Consul服务中心注销
- 验证查看服务名在Consul是否已经注册
Consul组件代码如下所示:
trait ConsulOpsComponents {private final val log: Logger = LoggerFactory.getLogger("ConsulOpsComponents")private val roundRobinIndexFor: Map[String, Int] = TrieMap.empty[String, Int]private val config = Configuration.load(Environment.simple())val appConfig = configval consulClient = new ConsulClient(consulAgentHost, consulAgentPort)final lazy val consulConfig = new ConsulConfig.ConsulConfigImpl(appConfig)/*** 根据配置文件参数获取Consul主机地址** @return*/def consulAgentHost = consulConfig.agentHostname/*** 根据配置文件参数获取Consul服务端口** @return*/def consulAgentPort = consulConfig.agentPort/*** 向Consul中心注册服务** @param service 想要注册的服务信息* @return*/def registerService(service: NewService) = {consulClient.agentServiceRegister(service)}def registerServiceToConsul = registerService(getSelfServiceInfo)/*** 向Consul中心取消注册的服务** @param serviceId 注册时使用的服务标识* @return*/def deregisterService(serviceId: String) = consulClient.agentServiceDeregister(serviceId)/*** 根据服务名获取Consul注册中心对应的URI** @param serviceName 服务名称* @return serviceURI*/def getURI(serviceName: String) = {val service = consulClient.getCatalogService(serviceName, CatalogServiceRequest.newBuilder().setQueryParams(QueryParams.DEFAULT).build()).getValue.asScala.toListval serviceURI = pickRoundRobinInstance(serviceName, service)serviceURI}/*** 验证查看服务名在Consul是否已经注册,并打印服务URI*/def lookRegisterServiceInConsul = {val name = config.get[String]("service.registry.service-name")val uri = getURI(name).toStringlog.info(s"[${name}] service has registered,the uri is {}", uri)}/*** 构造服务信息** @return*/def getSelfServiceInfo: NewService = {val id = config.get[String]("service.registry.service-id")val name = config.get[String]("service.registry.service-name")val port = config.get[Int]("service.registry.service-port")val hostIp = config.get[String]("service.registry.service-ip")val service = new NewServiceservice.setName(name)service.setPort(port)service.setId(id)service.setAddress(hostIp)service}private[consul] def pickRoundRobinInstance(name: String, services: List[CatalogService]): URI = {if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")roundRobinIndexFor.putIfAbsent(name, 0)val sortedServices = toURIs(services).sortedval currentIndex = roundRobinIndexFor(name)val nextIndex =if (sortedServices.size > currentIndex + 1) currentIndex + 1else 0roundRobinIndexFor.replace(name, nextIndex)sortedServices.apply(currentIndex)}private[consul] def pickFirstInstance(services: List[CatalogService]): URI = {if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")toURIs(services).sorted.head}private[consul] def pickRandomInstance(services: List[CatalogService]): URI = {if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")toURIs(services).sorted.apply(ThreadLocalRandom.current.nextInt(services.size - 1))}private[consul] def toURIs(services: List[CatalogService]): List[URI] =services.map { service =>val address = service.getServiceAddressval serviceAddress =if (address.trim.isEmpty || address == "localhost") InetAddress.getLoopbackAddress.getHostAddresselse addressnew URI(s"${consulConfig.scheme}://$serviceAddress:${service.getServicePort}")}}
将自身服务注册到Consul服务中心,需要在配置文件中添加自身服务的名称等信息,需要在resource/application.conf文件中,添加服务配置信息如下:
#自身服务要注册的信息service {registry {service-id = "service-test"service-name = "service-test"service-port = 9000service-ip = "127.0.0.1"}}
Consul组件使用
需要将定义好的Consul组件特征混入到ServiceLoader中,如果以ShoppingCart为例,需要将上述定义的ConsulOpsComponents特质混入到ShoppingCartComponents中,并在ShoppingCartComponents内增加服务注册的方法,以便在服务启动时自动将自身服务注册到Consul服务中心。
trait ShoppingCartComponentsextends LagomServerComponentswith SlickPersistenceComponentswith HikariCPComponentswith AhcWSComponentswith ConsulOpsComponents{......//增加Consul服务注册registerServiceToConsullookRegisterServiceInConsul}
Consul测试用例
我们如果希望单独测试一下Consul服务注册的能力,可以单独写测试用例来测试服务注册、服务查看、服务注销的能力。
为了使用组件能力方便,我们新建一个Consul的操作类ConsulOps,用于方便的获取ConsulOpsComponents组件实例,代码如下:
private[consul] class ConsulOps extends ConsulOpsComponentsobject ConsulOps {final val instance = new ConsulOps()}
测试用例可以使用ScalaTest包,使用should断言语法,但本例为了方便,直接使用main函数来进行测试,Consul主要功能的测试用例如下所示:
object ConsulTest {def main(args: Array[String]): Unit = {val consulOps = ConsulOps.instanceval id = consulOps.appConfig.get[String]("service.registry.service-id")val name = consulOps.appConfig.get[String]("service.registry.service-name")val hostIp = consulOps.appConfig.get[String]("service.registry.service-ip")//将自身服务注册到Consul中心consulOps.registerServiceToConsul//使用组件方法查看上述的服务是否存在consulOps.lookRegisterServiceInConsul//测试内部方法获取指定服务名的uri信息val uri = consulOps.getURI(name)println(s"Test-Uri is {}",uri)//服务注销consulOps.deregisterService(id)println(hostIp)}}
测试用例运行日志信息如下:
2022-04-21 11:18:13,760 INFO ConsulOpsComponents - [service-test] service has registered,the uri is http://127.0.0.1:9000(Test-Uri is {},http://127.0.0.1:9000)127.0.0.1
访问Consul中心查看服务注册情况,如果服务注销方法一起运行,无法查看到对应的服务信息,如要查看需要暂时屏蔽掉服务注销方法的调用。
