:::info 开源示例参考:https://github.com/jboner/lagom-service-locator-consul :::
添加consul依赖
在build.sbt
文件中,添加consul的相关依赖,GitHub示例中的时间较早,版本API已经废弃,本例中使用较新的版本,并调整了API的新用法。
val consulApi = "com.ecwid.consul" % "consul-api" % "1.4.5"
Consul组件
ConsulConfig
用于读取配置文件中,配置的consul中心地址,希望将自身服务注册到哪个consul服务,需要配置必要的信息,包括Consul服务的IP地址和服务端口,uri方式和路由策略。代码如下:
trait ConsulConfig {
def agentHostname: String
def agentPort: Int
def scheme: String
def routingPolicy: RoutingPolicy
}
object ConsulConfig {
class ConsulConfigImpl (config: Configuration) extends ConsulConfig {
override val agentHostname = config.get[String]("server.discovery.consul.agent-hostname")
override val agentPort = config.get[Int]("server.discovery.consul.agent-port")
override val scheme = config.get[String]("server.discovery.consul.uri-scheme")
override val routingPolicy = RoutingPolicy(config.get[String]("server.discovery.consul.routing-policy"))
}
}
对应的需要在resource/application.conf
文件中,添加对应的服务信息,配置信息如下:
##服务发现使用consul机制,用于表示consul集群地址信息
lagom {
discovery {
consul {
agent-hostname = "127.0.0.1" # hostname or IP-address for the Consul agent
agent-port = 8500 # port for the Consul agent
uri-scheme = "http" # for example: http or https
routing-policy = "round-robin" # valid routing policies: first, random, round-robin
}
}
}
路由策略
一般在consul服务中心注册的业务服务一般由多个实例构成的一个集群服务,对外统一暴露一个服务名称,但可能存在多个不同的服务实例节点,来响应服务请求。对于一个服务的不同实例的选取过程称为路由,路由策略有多种,不同的路由策略来不同程度的来保障不同实例之间的负载均衡,这里简单实现以下三种:
object RoutingPolicy {
def apply(policy: String): RoutingPolicy = policy match {
case "first" => First
case "random" => Random
case "round-robin" => RoundRobin
case unknown => throw new IllegalArgumentException(s"discovery.consul.routing-policy, [${unknown}] is not a valid routing algorithm")
}
}
sealed trait RoutingPolicy
case object First extends RoutingPolicy
case object Random extends RoutingPolicy
case object RoundRobin extends RoutingPolicy
Consul组件定义
主要能力定义:
- 将当前的服务注册到Consul服务中心
- 将当前服务从Consul服务中心注销
- 验证查看服务名在Consul是否已经注册
Consul组件代码如下所示:
trait ConsulOpsComponents {
private final val log: Logger = LoggerFactory.getLogger("ConsulOpsComponents")
private val roundRobinIndexFor: Map[String, Int] = TrieMap.empty[String, Int]
private val config = Configuration.load(Environment.simple())
val appConfig = config
val consulClient = new ConsulClient(consulAgentHost, consulAgentPort)
final lazy val consulConfig = new ConsulConfig.ConsulConfigImpl(appConfig)
/**
* 根据配置文件参数获取Consul主机地址
*
* @return
*/
def consulAgentHost = consulConfig.agentHostname
/**
* 根据配置文件参数获取Consul服务端口
*
* @return
*/
def consulAgentPort = consulConfig.agentPort
/**
* 向Consul中心注册服务
*
* @param service 想要注册的服务信息
* @return
*/
def registerService(service: NewService) = {
consulClient.agentServiceRegister(service)
}
def registerServiceToConsul = registerService(getSelfServiceInfo)
/**
* 向Consul中心取消注册的服务
*
* @param serviceId 注册时使用的服务标识
* @return
*/
def deregisterService(serviceId: String) = consulClient.agentServiceDeregister(serviceId)
/**
* 根据服务名获取Consul注册中心对应的URI
*
* @param serviceName 服务名称
* @return serviceURI
*/
def getURI(serviceName: String) = {
val service = consulClient.getCatalogService(serviceName, CatalogServiceRequest.newBuilder().setQueryParams(QueryParams.DEFAULT).build()).getValue.asScala.toList
val serviceURI = pickRoundRobinInstance(serviceName, service)
serviceURI
}
/**
* 验证查看服务名在Consul是否已经注册,并打印服务URI
*/
def lookRegisterServiceInConsul = {
val name = config.get[String]("service.registry.service-name")
val uri = getURI(name).toString
log.info(s"[${name}] service has registered,the uri is {}", uri)
}
/**
* 构造服务信息
*
* @return
*/
def getSelfServiceInfo: NewService = {
val id = config.get[String]("service.registry.service-id")
val name = config.get[String]("service.registry.service-name")
val port = config.get[Int]("service.registry.service-port")
val hostIp = config.get[String]("service.registry.service-ip")
val service = new NewService
service.setName(name)
service.setPort(port)
service.setId(id)
service.setAddress(hostIp)
service
}
private[consul] def pickRoundRobinInstance(name: String, services: List[CatalogService]): URI = {
if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
roundRobinIndexFor.putIfAbsent(name, 0)
val sortedServices = toURIs(services).sorted
val currentIndex = roundRobinIndexFor(name)
val nextIndex =
if (sortedServices.size > currentIndex + 1) currentIndex + 1
else 0
roundRobinIndexFor.replace(name, nextIndex)
sortedServices.apply(currentIndex)
}
private[consul] def pickFirstInstance(services: List[CatalogService]): URI = {
if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
toURIs(services).sorted.head
}
private[consul] def pickRandomInstance(services: List[CatalogService]): URI = {
if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
toURIs(services).sorted.apply(ThreadLocalRandom.current.nextInt(services.size - 1))
}
private[consul] def toURIs(services: List[CatalogService]): List[URI] =
services.map { service =>
val address = service.getServiceAddress
val serviceAddress =
if (address.trim.isEmpty || address == "localhost") InetAddress.getLoopbackAddress.getHostAddress
else address
new URI(s"${consulConfig.scheme}://$serviceAddress:${service.getServicePort}")
}
}
将自身服务注册到Consul服务中心,需要在配置文件中添加自身服务的名称等信息,需要在resource/application.conf
文件中,添加服务配置信息如下:
#自身服务要注册的信息
service {
registry {
service-id = "service-test"
service-name = "service-test"
service-port = 9000
service-ip = "127.0.0.1"
}
}
Consul组件使用
需要将定义好的Consul组件特征混入到ServiceLoader
中,如果以ShoppingCart为例,需要将上述定义的ConsulOpsComponents
特质混入到ShoppingCartComponents
中,并在ShoppingCartComponents
内增加服务注册的方法,以便在服务启动时自动将自身服务注册到Consul服务中心。
trait ShoppingCartComponents
extends LagomServerComponents
with SlickPersistenceComponents
with HikariCPComponents
with AhcWSComponents
with ConsulOpsComponents{
......
//增加Consul服务注册
registerServiceToConsul
lookRegisterServiceInConsul
}
Consul测试用例
我们如果希望单独测试一下Consul服务注册的能力,可以单独写测试用例来测试服务注册、服务查看、服务注销的能力。
为了使用组件能力方便,我们新建一个Consul的操作类ConsulOps
,用于方便的获取ConsulOpsComponents
组件实例,代码如下:
private[consul] class ConsulOps extends ConsulOpsComponents
object ConsulOps {
final val instance = new ConsulOps()
}
测试用例可以使用ScalaTest
包,使用should
断言语法,但本例为了方便,直接使用main
函数来进行测试,Consul主要功能的测试用例如下所示:
object ConsulTest {
def main(args: Array[String]): Unit = {
val consulOps = ConsulOps.instance
val id = consulOps.appConfig.get[String]("service.registry.service-id")
val name = consulOps.appConfig.get[String]("service.registry.service-name")
val hostIp = consulOps.appConfig.get[String]("service.registry.service-ip")
//将自身服务注册到Consul中心
consulOps.registerServiceToConsul
//使用组件方法查看上述的服务是否存在
consulOps.lookRegisterServiceInConsul
//测试内部方法获取指定服务名的uri信息
val uri = consulOps.getURI(name)
println(s"Test-Uri is {}",uri)
//服务注销
consulOps.deregisterService(id)
println(hostIp)
}
}
测试用例运行日志信息如下:
2022-04-21 11:18:13,760 INFO ConsulOpsComponents - [service-test] service has registered,the uri is http://127.0.0.1:9000
(Test-Uri is {},http://127.0.0.1:9000)
127.0.0.1
访问Consul中心查看服务注册情况,如果服务注销方法一起运行,无法查看到对应的服务信息,如要查看需要暂时屏蔽掉服务注销方法的调用。