现状

  1. 没有原生PHP pulsar extension
  2. pulsar上开启了 KOP(Kafka On Pulsar)或 WebSocket 协议
  3. 客户端使用 rdkafka的扩展或 websocket 链接

实际使用情况

  • saas-project 使用 kop。实际是 rdkafka 扩展。生产者发送消息时 kop 协议会对消息体做一定的处理。(base64 encode)
  • his-api 使用 websocket方式。生产者发送消息时 ws 协议会对消息体做一定的处理。(base64 encode)。使用 textalk/websocket 作为管理 websocket 生命周期的依赖。新增了一个 Utils 类在项目中。
  1. <?php
  2. namespace App\Utils;
  3. use Illuminate\Support\Carbon;
  4. use Illuminate\Support\Facades\Log;
  5. use WebSocket\Client;
  6. /**
  7. * pulsar web socket
  8. * Class PulsarUtils
  9. * @package App\Utils
  10. * @resource https://pulsar.apache.org/docs/en/client-libraries-websocket/
  11. * @example
  12. * PulsarUtils::getInstance()->setConf([
  13. * "url" => "111.111.0.60:8080",
  14. * "tenant"=>"socket",
  15. * "namespace"=>"socket_namespace",
  16. * "topic"=>"socket_topic",
  17. * "subname"=>"ctz_test",
  18. * ])->consumerMsg(function ($msgReceive){
  19. * echo base64_decode($msgReceive['payload']).PHP_EOL;
  20. * return true;
  21. * });
  22. *
  23. */
  24. final class PulsarUtils
  25. {
  26. protected $pulsarConf = [];
  27. private static $_instance = NULL;
  28. public static function getInstance()
  29. {
  30. if (is_null(self::$_instance)) {
  31. self::$_instance = new self();
  32. }
  33. return self::$_instance;
  34. }
  35. public function checkConfig($configArray = [])
  36. {
  37. if (!array_key_exists("url", $configArray)) {
  38. return false;
  39. }
  40. if (!array_key_exists("tenant", $configArray)) {
  41. return false;
  42. }
  43. if (!array_key_exists("namespace", $configArray)) {
  44. return false;
  45. }
  46. if (!array_key_exists("topic", $configArray)) {
  47. return false;
  48. }
  49. return true;
  50. }
  51. /**
  52. * @param $pulsarConfig pulsar配置
  53. * @return $this
  54. */
  55. public function setConf($pulsarConfig)
  56. {
  57. if ($this->checkConfig($pulsarConfig)) {
  58. $this->pulsarConf = $pulsarConfig;
  59. }
  60. return $this;
  61. }
  62. /**
  63. * 获取配置
  64. * @return array
  65. */
  66. public function getConf()
  67. {
  68. return $this->pulsarConf;
  69. }
  70. /**
  71. * 生产者 pulsar ws链接
  72. * @param $config
  73. * @return string
  74. */
  75. protected function getProducerWsUrl()
  76. {
  77. if ($this->checkConfig($this->pulsarConf)) {
  78. return sprintf(
  79. "ws://%s/ws/v2/producer/persistent/%s/%s/%s",
  80. $this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic']
  81. );
  82. }
  83. return "";
  84. }
  85. /**
  86. * 消费者 pulsar ws链接
  87. * @return string
  88. */
  89. protected function getConsumerWSUrl()
  90. {
  91. if ($this->checkConfig($this->pulsarConf)) {
  92. return sprintf(
  93. "ws://%s/ws/v2/consumer/persistent/%s/%s/%s/%s?subscriptionType=Shared",
  94. $this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic'], $this->pulsarConf['subname'] ?? ""
  95. );
  96. }
  97. return "";
  98. }
  99. /**
  100. * 生产者端
  101. * @param $config
  102. * @return null|Client
  103. */
  104. protected function producerClient()
  105. {
  106. $wsUrl = $this->getProducerWsUrl($this->pulsarConf);
  107. static $pulsarProducerClients = [];
  108. if ($wsUrl) {
  109. $wsUrlHash = md5($wsUrl);
  110. if (isset($pulsarProducerClients[$wsUrlHash])) {
  111. return $pulsarProducerClients[$wsUrlHash];
  112. } else {
  113. $pulsarProducerClients[$wsUrlHash] = new Client($wsUrl);
  114. return $pulsarProducerClients[$wsUrlHash];
  115. }
  116. }
  117. return null;
  118. }
  119. /**
  120. * @param $config
  121. * @return Client
  122. * @throws \Exception
  123. */
  124. protected function consumerClient()
  125. {
  126. $wsUrl = $this->getConsumerWSUrl($this->pulsarConf);
  127. static $pulsarConsumerClients = [];
  128. if ($wsUrl) {
  129. $wsUrlHash = md5($wsUrl);
  130. if (isset($pulsarConsumerClients[$wsUrlHash])) {
  131. return $pulsarConsumerClients[$wsUrlHash];
  132. } else {
  133. $pulsarConsumerClients[$wsUrlHash] = new Client($wsUrl,["timeout"=>10]);
  134. return $pulsarConsumerClients[$wsUrlHash];
  135. }
  136. }
  137. return null;
  138. }
  139. /**
  140. * 生产者
  141. * @param $message
  142. * @return bool
  143. * @throws \Exception
  144. */
  145. public function producerMsg($message)
  146. {
  147. $producerClient = $this->producerClient();
  148. if ($producerClient == null) {
  149. throw new \Exception("pulsar_producerMsg_exception:" . json_encode($this->getConf()));
  150. }
  151. //判断是否是数组类型
  152. if (is_array($message)) {
  153. $message = json_encode($message, JSON_UNESCAPED_UNICODE);
  154. }
  155. //发送前需要base64
  156. $base64Message = base64_encode($message);
  157. $prepareSendMessage = [
  158. "payload" => $base64Message,
  159. "properties" => new \stdClass(),
  160. "context" => "",
  161. "key" => "",
  162. ];
  163. $sendMessage = json_encode($prepareSendMessage);
  164. $producerClient->text($sendMessage);
  165. $sendResult = $producerClient->receive();
  166. try {
  167. $result = json_decode($sendResult, true);
  168. if (array_key_exists("result", $result) &&
  169. $result['result'] == "ok") {
  170. return true;
  171. } else {
  172. throw new \Exception("pulsar_websocket_send:" .
  173. $sendResult);
  174. }
  175. } catch (\Exception $e) {
  176. throw new \Exception("pulsar_send_exception:" . $e->getMessage());
  177. }
  178. }
  179. /**
  180. * 消费者
  181. * @param callable $func
  182. * @throws \Exception
  183. */
  184. public function consumerMsg(callable $func)
  185. {
  186. $consumerClient = $this->consumerClient();
  187. if ($consumerClient == null) {
  188. throw new \Exception("pulsar_consumerMsg_exception:" . json_encode($this->getConf()));
  189. }
  190. while (true) {
  191. try {
  192. $result = $consumerClient->receive();
  193. if($result==null){
  194. continue;
  195. }
  196. Log::channel('pulsar-his')->info("pulsar_receive",["receive_time"=>Carbon::now()->toDateTimeString(),"receive_data"=>$result]);
  197. $resultArray = json_decode($result, true);
  198. $functionResult = call_user_func($func, $resultArray);
  199. if ($functionResult) {
  200. $ackMsg = ['messageId' => $resultArray['messageId']];
  201. $consumerClient->text(json_encode($ackMsg));
  202. }
  203. } catch (\Throwable $e) {
  204. //连接超时 默认5S连接超时
  205. if($e->getCode()==1024){
  206. //echo $e->getMessage().PHP_EOL;
  207. continue;
  208. }
  209. Log::channel('pulsar-his')->error("解析消费异常:",[$e->getMessage(),$e->getCode(),$e->getLine()]);
  210. exit;
  211. }
  212. }
  213. }
  214. }

存在问题

  1. 没有统一的 composer php 包依赖
  2. 封装比较粗糙
  3. 没有使用文档及 examples
  4. 错误处理较弱
  5. 稳定和可靠性待实际验证

最佳实践 RoadMap

  1. 开发自有 pulsar 的 websocket 通讯方式的 PHP composer依赖包。
  2. composer包要实现主要的 pulsar API 。包括但不限于客户端链接、断开、生产者发送消息、消费者读取消费消息、消费者回执等。保证客户端的稳定,不能阻塞后续程序执行,不能有内存泄漏,不能导致程序非正常退出,要正确处理pulsar server端的错误。
  3. 依赖包最好适配框架 Laravel.
  4. Topic 命名要能区分
    1. 业务方向或业务分组
    2. 以小写字母加数字以下划线_和.命名
  5. 消息体大小尽量不要超过 1MB
  6. 确认消费完成后,消费者需要回执 Ack
  7. 对于重复消费行为,消费者需要针对业务做幂等或业务逻辑要求的处理
  8. 生产和消费可溯源,可追踪,有日志留存
  9. 针对消费失败和其他故障的可重放
  10. 管理平台或面板展示

监控 && 告警指标

  1. 各Topic producer 和 consumer 的连接情况包括
    1. 正在活跃
    2. 历史连接
    3. 消息积压
    4. 消费速度
  2. producer 和 consumer 无存活告警、队列消息积压条数条件告警
  3. pulsar 本身告警(pulsar提供)
    1. cpu
    2. 内存
    3. 心跳
    4. 存储