EasySwoole RPC 自定义注册中心

EasySwoole 默认为通过UDP广播+自定义进程定时刷新自身节点信息的方式来实现无主化/注册中心的服务发现。在服务正常关闭的时候,自定义定时进程的onShutdown方法会执行deleteServiceNode 方法来实现节点下线。在非正常关闭的时候,心跳超时也会被节点管理器踢出, 有些情况,不方便用UDP广播的情况下,那么EasySwoole支持你自定义一个节点管理器,来变更服务发现方式。

例如用Redis来实现

  1. namespace EasySwoole\Rpc\NodeManager;
  2. use EasySwoole\RedisPool\RedisPool;
  3. use EasySwoole\Rpc\ServiceNode;
  4. use EasySwoole\Utility\Random;
  5. class RedisManager implements NodeManagerInterface
  6. {
  7. protected $redisKey;
  8. protected $pool;
  9. function __construct(RedisPool $pool, string $hashKey = 'rpc')
  10. {
  11. $this->redisKey = $hashKey;
  12. $this->pool = $pool;
  13. }
  14. function getServiceNodes(string $serviceName, ?string $version = null): array
  15. {
  16. $redis = $this->pool->getObj(15);
  17. try {
  18. $nodes = $redis->hGetAll("{$this->redisKey}_{$serviceName}");
  19. $nodes = $nodes ?: [];
  20. $ret = [];
  21. foreach ($nodes as $nodeId => $node) {
  22. $node = new ServiceNode(json_decode($node,true));
  23. /**
  24. * @var $nodeId
  25. * @var ServiceNode $node
  26. */
  27. if (time() - $node->getLastHeartBeat() > 30) {
  28. $this->deleteServiceNode($node);
  29. }
  30. if ($version && $version != $node->getServiceVersion()) {
  31. continue;
  32. }
  33. $ret[$nodeId] = $node;
  34. }
  35. return $ret;
  36. } catch (\Throwable $throwable) {
  37. //如果该redis断线则销毁
  38. $this->pool->unsetObj($redis);
  39. } finally {
  40. $this->pool->recycleObj($redis);
  41. }
  42. return [];
  43. }
  44. function getServiceNode(string $serviceName, ?string $version = null): ?ServiceNode
  45. {
  46. $list = $this->getServiceNodes($serviceName, $version);
  47. if (empty($list)) {
  48. return null;
  49. }
  50. return Random::arrayRandOne($list);
  51. }
  52. function deleteServiceNode(ServiceNode $serviceNode): bool
  53. {
  54. $redis = $this->pool->getObj(15);
  55. try {
  56. $redis->hDel($this->generateNodeKey($serviceNode), $serviceNode->getNodeId());
  57. return true;
  58. } catch (\Throwable $throwable) {
  59. $this->pool->unsetObj($redis);
  60. } finally {
  61. $this->pool->recycleObj($redis);
  62. }
  63. return false;
  64. }
  65. function serviceNodeHeartBeat(ServiceNode $serviceNode): bool
  66. {
  67. if (empty($serviceNode->getLastHeartBeat())) {
  68. $serviceNode->setLastHeartBeat(time());
  69. }
  70. $redis = $this->pool->getObj(15);
  71. try {
  72. $redis->hSet($this->generateNodeKey($serviceNode), $serviceNode->getNodeId(), $serviceNode->__toString());
  73. return true;
  74. } catch (\Throwable $throwable) {
  75. $this->pool->unsetObj($redis);
  76. } finally {
  77. //这边需要测试一个对象被unset后是否还能被回收
  78. $this->pool->recycleObj($redis);
  79. }
  80. return false;
  81. }
  82. protected function generateNodeKey(ServiceNode $node)
  83. {
  84. return "{$this->redisKey}_{$node->getServiceName()}";
  85. }
  86. }

::: warning 即使关闭了UDP定时广,EasySwoole Rpc的tick进程依旧会每3秒执行一次serviceNodeHeartBeat用于更新自身的节点心跳信息。 :::