:::info 开源示例参考:https://github.com/jboner/lagom-service-locator-consul :::

添加consul依赖

build.sbt文件中,添加consul的相关依赖,GitHub示例中的时间较早,版本API已经废弃,本例中使用较新的版本,并调整了API的新用法。

  1. val consulApi = "com.ecwid.consul" % "consul-api" % "1.4.5"

Consul组件

ConsulConfig

用于读取配置文件中,配置的consul中心地址,希望将自身服务注册到哪个consul服务,需要配置必要的信息,包括Consul服务的IP地址和服务端口,uri方式和路由策略。代码如下:

  1. trait ConsulConfig {
  2. def agentHostname: String
  3. def agentPort: Int
  4. def scheme: String
  5. def routingPolicy: RoutingPolicy
  6. }
  7. object ConsulConfig {
  8. class ConsulConfigImpl (config: Configuration) extends ConsulConfig {
  9. override val agentHostname = config.get[String]("server.discovery.consul.agent-hostname")
  10. override val agentPort = config.get[Int]("server.discovery.consul.agent-port")
  11. override val scheme = config.get[String]("server.discovery.consul.uri-scheme")
  12. override val routingPolicy = RoutingPolicy(config.get[String]("server.discovery.consul.routing-policy"))
  13. }
  14. }

对应的需要在resource/application.conf文件中,添加对应的服务信息,配置信息如下:

  1. ##服务发现使用consul机制,用于表示consul集群地址信息
  2. lagom {
  3. discovery {
  4. consul {
  5. agent-hostname = "127.0.0.1" # hostname or IP-address for the Consul agent
  6. agent-port = 8500 # port for the Consul agent
  7. uri-scheme = "http" # for example: http or https
  8. routing-policy = "round-robin" # valid routing policies: first, random, round-robin
  9. }
  10. }
  11. }

路由策略

一般在consul服务中心注册的业务服务一般由多个实例构成的一个集群服务,对外统一暴露一个服务名称,但可能存在多个不同的服务实例节点,来响应服务请求。对于一个服务的不同实例的选取过程称为路由,路由策略有多种,不同的路由策略来不同程度的来保障不同实例之间的负载均衡,这里简单实现以下三种:

  1. object RoutingPolicy {
  2. def apply(policy: String): RoutingPolicy = policy match {
  3. case "first" => First
  4. case "random" => Random
  5. case "round-robin" => RoundRobin
  6. case unknown => throw new IllegalArgumentException(s"discovery.consul.routing-policy, [${unknown}] is not a valid routing algorithm")
  7. }
  8. }
  9. sealed trait RoutingPolicy
  10. case object First extends RoutingPolicy
  11. case object Random extends RoutingPolicy
  12. case object RoundRobin extends RoutingPolicy

Consul组件定义

主要能力定义:

  • 将当前的服务注册到Consul服务中心
  • 将当前服务从Consul服务中心注销
  • 验证查看服务名在Consul是否已经注册

Consul组件代码如下所示:

  1. trait ConsulOpsComponents {
  2. private final val log: Logger = LoggerFactory.getLogger("ConsulOpsComponents")
  3. private val roundRobinIndexFor: Map[String, Int] = TrieMap.empty[String, Int]
  4. private val config = Configuration.load(Environment.simple())
  5. val appConfig = config
  6. val consulClient = new ConsulClient(consulAgentHost, consulAgentPort)
  7. final lazy val consulConfig = new ConsulConfig.ConsulConfigImpl(appConfig)
  8. /**
  9. * 根据配置文件参数获取Consul主机地址
  10. *
  11. * @return
  12. */
  13. def consulAgentHost = consulConfig.agentHostname
  14. /**
  15. * 根据配置文件参数获取Consul服务端口
  16. *
  17. * @return
  18. */
  19. def consulAgentPort = consulConfig.agentPort
  20. /**
  21. * 向Consul中心注册服务
  22. *
  23. * @param service 想要注册的服务信息
  24. * @return
  25. */
  26. def registerService(service: NewService) = {
  27. consulClient.agentServiceRegister(service)
  28. }
  29. def registerServiceToConsul = registerService(getSelfServiceInfo)
  30. /**
  31. * 向Consul中心取消注册的服务
  32. *
  33. * @param serviceId 注册时使用的服务标识
  34. * @return
  35. */
  36. def deregisterService(serviceId: String) = consulClient.agentServiceDeregister(serviceId)
  37. /**
  38. * 根据服务名获取Consul注册中心对应的URI
  39. *
  40. * @param serviceName 服务名称
  41. * @return serviceURI
  42. */
  43. def getURI(serviceName: String) = {
  44. val service = consulClient.getCatalogService(serviceName, CatalogServiceRequest.newBuilder().setQueryParams(QueryParams.DEFAULT).build()).getValue.asScala.toList
  45. val serviceURI = pickRoundRobinInstance(serviceName, service)
  46. serviceURI
  47. }
  48. /**
  49. * 验证查看服务名在Consul是否已经注册,并打印服务URI
  50. */
  51. def lookRegisterServiceInConsul = {
  52. val name = config.get[String]("service.registry.service-name")
  53. val uri = getURI(name).toString
  54. log.info(s"[${name}] service has registered,the uri is {}", uri)
  55. }
  56. /**
  57. * 构造服务信息
  58. *
  59. * @return
  60. */
  61. def getSelfServiceInfo: NewService = {
  62. val id = config.get[String]("service.registry.service-id")
  63. val name = config.get[String]("service.registry.service-name")
  64. val port = config.get[Int]("service.registry.service-port")
  65. val hostIp = config.get[String]("service.registry.service-ip")
  66. val service = new NewService
  67. service.setName(name)
  68. service.setPort(port)
  69. service.setId(id)
  70. service.setAddress(hostIp)
  71. service
  72. }
  73. private[consul] def pickRoundRobinInstance(name: String, services: List[CatalogService]): URI = {
  74. if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
  75. roundRobinIndexFor.putIfAbsent(name, 0)
  76. val sortedServices = toURIs(services).sorted
  77. val currentIndex = roundRobinIndexFor(name)
  78. val nextIndex =
  79. if (sortedServices.size > currentIndex + 1) currentIndex + 1
  80. else 0
  81. roundRobinIndexFor.replace(name, nextIndex)
  82. sortedServices.apply(currentIndex)
  83. }
  84. private[consul] def pickFirstInstance(services: List[CatalogService]): URI = {
  85. if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
  86. toURIs(services).sorted.head
  87. }
  88. private[consul] def pickRandomInstance(services: List[CatalogService]): URI = {
  89. if (services.isEmpty) throw new IllegalStateException("List of services should not be empty")
  90. toURIs(services).sorted.apply(ThreadLocalRandom.current.nextInt(services.size - 1))
  91. }
  92. private[consul] def toURIs(services: List[CatalogService]): List[URI] =
  93. services.map { service =>
  94. val address = service.getServiceAddress
  95. val serviceAddress =
  96. if (address.trim.isEmpty || address == "localhost") InetAddress.getLoopbackAddress.getHostAddress
  97. else address
  98. new URI(s"${consulConfig.scheme}://$serviceAddress:${service.getServicePort}")
  99. }
  100. }

将自身服务注册到Consul服务中心,需要在配置文件中添加自身服务的名称等信息,需要在resource/application.conf文件中,添加服务配置信息如下:

  1. #自身服务要注册的信息
  2. service {
  3. registry {
  4. service-id = "service-test"
  5. service-name = "service-test"
  6. service-port = 9000
  7. service-ip = "127.0.0.1"
  8. }
  9. }

Consul组件使用

需要将定义好的Consul组件特征混入到ServiceLoader中,如果以ShoppingCart为例,需要将上述定义的ConsulOpsComponents特质混入到ShoppingCartComponents中,并在ShoppingCartComponents内增加服务注册的方法,以便在服务启动时自动将自身服务注册到Consul服务中心。

  1. trait ShoppingCartComponents
  2. extends LagomServerComponents
  3. with SlickPersistenceComponents
  4. with HikariCPComponents
  5. with AhcWSComponents
  6. with ConsulOpsComponents{
  7. ......
  8. //增加Consul服务注册
  9. registerServiceToConsul
  10. lookRegisterServiceInConsul
  11. }

Consul测试用例

我们如果希望单独测试一下Consul服务注册的能力,可以单独写测试用例来测试服务注册、服务查看、服务注销的能力。
为了使用组件能力方便,我们新建一个Consul的操作类ConsulOps,用于方便的获取ConsulOpsComponents组件实例,代码如下:

  1. private[consul] class ConsulOps extends ConsulOpsComponents
  2. object ConsulOps {
  3. final val instance = new ConsulOps()
  4. }

测试用例可以使用ScalaTest包,使用should断言语法,但本例为了方便,直接使用main函数来进行测试,Consul主要功能的测试用例如下所示:

  1. object ConsulTest {
  2. def main(args: Array[String]): Unit = {
  3. val consulOps = ConsulOps.instance
  4. val id = consulOps.appConfig.get[String]("service.registry.service-id")
  5. val name = consulOps.appConfig.get[String]("service.registry.service-name")
  6. val hostIp = consulOps.appConfig.get[String]("service.registry.service-ip")
  7. //将自身服务注册到Consul中心
  8. consulOps.registerServiceToConsul
  9. //使用组件方法查看上述的服务是否存在
  10. consulOps.lookRegisterServiceInConsul
  11. //测试内部方法获取指定服务名的uri信息
  12. val uri = consulOps.getURI(name)
  13. println(s"Test-Uri is {}",uri)
  14. //服务注销
  15. consulOps.deregisterService(id)
  16. println(hostIp)
  17. }
  18. }

测试用例运行日志信息如下:

  1. 2022-04-21 11:18:13,760 INFO ConsulOpsComponents - [service-test] service has registered,the uri is http://127.0.0.1:9000
  2. (Test-Uri is {},http://127.0.0.1:9000)
  3. 127.0.0.1

访问Consul中心查看服务注册情况,如果服务注销方法一起运行,无法查看到对应的服务信息,如要查看需要暂时屏蔽掉服务注销方法的调用。
image.png