title: EasySwoole RPC Custom Registration Center meta:

  • name: description content: Implementing RPC Custom Registration Center in EasySwoole
  • name: keywords content: swoole|swoole extension|swoole framework|Easyswoole|Rpc Service Registration Center|swoole RPC|RPC

EasySwoole RPC Custom Registration Center

EasySwoole defaults to unaware service discovery through UDP broadcast. However, in some cases, it is not convenient to use UDP broadcast, then EasySwoole supports you to customize a node manager to change the service discovery mode.

For example, using Redis to achieve

  1. <?php
  2. namespace EasySwoole\Rpc\NodeManager;
  3. use EasySwoole\Component\Pool\PoolConf;
  4. use EasySwoole\Component\Pool\PoolManager;
  5. use EasySwoole\Rpc\ServiceNode;
  6. use EasySwoole\Utility\Random;
  7. use Swoole\Coroutine\Channel;
  8. use Swoole\Coroutine\Redis;
  9. class RedisManager implements NodeManagerInterface
  10. {
  11. protected $redisKey;
  12. /** @var Channel */
  13. protected $channel;
  14. function __construct(string $host, $port = 6379, $auth = null, string $hashKey = '__rpcNodes', int $maxRedisNum = 10)
  15. {
  16. $this->redisKey = $hashKey;
  17. //Register anonymous connection pool
  18. PoolManager::getInstance()->registerAnonymous('__rpcRedis', function (PoolConf $conf) use ($host, $port, $auth, $maxRedisNum) {
  19. $conf->setMaxObjectNum($maxRedisNum);
  20. $redis = new Redis();
  21. $redis->connect($host, $port);
  22. if ($auth) {
  23. $redis->auth($auth);
  24. }
  25. $redis->setOptions(['serialize' => true, 'compatibility_mode' => true]);
  26. return $redis;
  27. });
  28. }
  29. /**
  30. * Get all available nodes of a service
  31. */
  32. function getServiceNodes(string $serviceName, ?string $version = null): array
  33. {
  34. /** @var \Redis $redis */
  35. $redis = PoolManager::getInstance()->getPool('__rpcRedis')->getObj(15);//Connection pool to take redis object
  36. try {
  37. $nodes = $redis->hGetAll($this->redisKey . md5($serviceName));
  38. $nodes = $nodes ?: [];
  39. $ret = [];
  40. foreach ($nodes as $nodeId => $node) {
  41. /**
  42. * @var $nodeId
  43. * @var ServiceNode $node
  44. */
  45. if (time() - $node->getLastHeartBeat() > 30) {//Check the node's last heartbeat time
  46. $this->deleteServiceNode($node);
  47. }
  48. if ($version && $version != $node->getServiceVersion()) {
  49. continue;
  50. }
  51. $ret[$nodeId] = $node;
  52. }
  53. return $ret;
  54. } catch (\Throwable $throwable) {
  55. //If the redis is disconnected, it is destroyed.
  56. PoolManager::getInstance()->getPool('__rpcRedis')->unsetObj($redis);
  57. } finally {
  58. //Here you need to test whether an object can be recycled after it is unset.
  59. PoolManager::getInstance()->getPool('__rpcRedis')->recycleObj($redis);
  60. }
  61. return [];
  62. }
  63. /**
  64. * Get a random node available for a service
  65. */
  66. function getServiceNode(string $serviceName, ?string $version = null): ?ServiceNode
  67. {
  68. $list = $this->getServiceNodes($serviceName, $version);
  69. if (empty($list)) {
  70. return null;
  71. }
  72. return Random::arrayRandOne($list);
  73. }
  74. /**
  75. * Delete node
  76. */
  77. function deleteServiceNode(ServiceNode $serviceNode): bool
  78. {
  79. /** @var \Redis $redis */
  80. $redis = PoolManager::getInstance()->getPool('__rpcRedis')->getObj(15);
  81. try {
  82. $redis->hDel($this->redisKey . md5($serviceNode->getServiceName()), $serviceNode->getNodeId());
  83. return true;
  84. } catch (\Throwable $throwable) {
  85. PoolManager::getInstance()->getPool('__rpcRedis')->unsetObj($redis);
  86. } finally {
  87. PoolManager::getInstance()->getPool('__rpcRedis')->recycleObj($redis);
  88. }
  89. return false;
  90. }
  91. /**
  92. * Refresh node(
  93. * Ps: refreshed by the tick process process timer (this node) | listen for broadcast messages (other nodes) to refresh node information, redis node manager can)
  94. */
  95. function serviceNodeHeartBeat(ServiceNode $serviceNode): bool
  96. {
  97. if (empty($serviceNode->getLastHeartBeat())) {
  98. $serviceNode->setLastHeartBeat(time());
  99. }
  100. /** @var \Redis $redis */
  101. $redis = PoolManager::getInstance()->getPool('__rpcRedis')->getObj(15);
  102. try {
  103. $redis->hSet($this->redisKey . md5($serviceNode->getServiceName()), $serviceNode->getNodeId(), $serviceNode);
  104. return true;
  105. } catch (\Throwable $throwable) {
  106. //If the redis is disconnected, it is destroyed.
  107. PoolManager::getInstance()->getPool('__rpcRedis')->unsetObj($redis);
  108. } finally {
  109. //Here you need to test whether an object can be recycled after it is unset.
  110. PoolManager::getInstance()->getPool('__rpcRedis')->recycleObj($redis);
  111. }
  112. return false;
  113. }
  114. }

::: warning Note that once the custom node manager is set, it is no longer necessary to enable the UDP scheduled broadcast process. After creating an RPC instance, create a ServiceNode object and refresh it to the registry. The same is true for the node offline. When your service is closed, the node manager is taken offline. :::