现状
- 没有原生PHP pulsar extension
- pulsar上开启了 KOP(Kafka On Pulsar)或 WebSocket 协议
- 客户端使用 rdkafka的扩展或 websocket 链接
实际使用情况
- saas-project 使用 kop。实际是 rdkafka 扩展。生产者发送消息时 kop 协议会对消息体做一定的处理。(base64 encode)
- his-api 使用 websocket方式。生产者发送消息时 ws 协议会对消息体做一定的处理。(base64 encode)。使用 textalk/websocket 作为管理 websocket 生命周期的依赖。新增了一个 Utils 类在项目中。
<?phpnamespace App\Utils;use Illuminate\Support\Carbon;use Illuminate\Support\Facades\Log;use WebSocket\Client;/** * pulsar web socket * Class PulsarUtils * @package App\Utils * @resource https://pulsar.apache.org/docs/en/client-libraries-websocket/ * @example * PulsarUtils::getInstance()->setConf([ * "url" => "111.111.0.60:8080", * "tenant"=>"socket", * "namespace"=>"socket_namespace", * "topic"=>"socket_topic", * "subname"=>"ctz_test", * ])->consumerMsg(function ($msgReceive){ * echo base64_decode($msgReceive['payload']).PHP_EOL; * return true; * }); * */final class PulsarUtils{ protected $pulsarConf = []; private static $_instance = NULL; public static function getInstance() { if (is_null(self::$_instance)) { self::$_instance = new self(); } return self::$_instance; } public function checkConfig($configArray = []) { if (!array_key_exists("url", $configArray)) { return false; } if (!array_key_exists("tenant", $configArray)) { return false; } if (!array_key_exists("namespace", $configArray)) { return false; } if (!array_key_exists("topic", $configArray)) { return false; } return true; } /** * @param $pulsarConfig pulsar配置 * @return $this */ public function setConf($pulsarConfig) { if ($this->checkConfig($pulsarConfig)) { $this->pulsarConf = $pulsarConfig; } return $this; } /** * 获取配置 * @return array */ public function getConf() { return $this->pulsarConf; } /** * 生产者 pulsar ws链接 * @param $config * @return string */ protected function getProducerWsUrl() { if ($this->checkConfig($this->pulsarConf)) { return sprintf( "ws://%s/ws/v2/producer/persistent/%s/%s/%s", $this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic'] ); } return ""; } /** * 消费者 pulsar ws链接 * @return string */ protected function getConsumerWSUrl() { if ($this->checkConfig($this->pulsarConf)) { return sprintf( "ws://%s/ws/v2/consumer/persistent/%s/%s/%s/%s?subscriptionType=Shared", $this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic'], $this->pulsarConf['subname'] ?? "" ); } return ""; } /** * 生产者端 * @param $config * @return null|Client */ protected function producerClient() { $wsUrl = $this->getProducerWsUrl($this->pulsarConf); static $pulsarProducerClients = []; if ($wsUrl) { $wsUrlHash = md5($wsUrl); if (isset($pulsarProducerClients[$wsUrlHash])) { return $pulsarProducerClients[$wsUrlHash]; } else { $pulsarProducerClients[$wsUrlHash] = new Client($wsUrl); return $pulsarProducerClients[$wsUrlHash]; } } return null; } /** * @param $config * @return Client * @throws \Exception */ protected function consumerClient() { $wsUrl = $this->getConsumerWSUrl($this->pulsarConf); static $pulsarConsumerClients = []; if ($wsUrl) { $wsUrlHash = md5($wsUrl); if (isset($pulsarConsumerClients[$wsUrlHash])) { return $pulsarConsumerClients[$wsUrlHash]; } else { $pulsarConsumerClients[$wsUrlHash] = new Client($wsUrl,["timeout"=>10]); return $pulsarConsumerClients[$wsUrlHash]; } } return null; } /** * 生产者 * @param $message * @return bool * @throws \Exception */ public function producerMsg($message) { $producerClient = $this->producerClient(); if ($producerClient == null) { throw new \Exception("pulsar_producerMsg_exception:" . json_encode($this->getConf())); } //判断是否是数组类型 if (is_array($message)) { $message = json_encode($message, JSON_UNESCAPED_UNICODE); } //发送前需要base64 $base64Message = base64_encode($message); $prepareSendMessage = [ "payload" => $base64Message, "properties" => new \stdClass(), "context" => "", "key" => "", ]; $sendMessage = json_encode($prepareSendMessage); $producerClient->text($sendMessage); $sendResult = $producerClient->receive(); try { $result = json_decode($sendResult, true); if (array_key_exists("result", $result) && $result['result'] == "ok") { return true; } else { throw new \Exception("pulsar_websocket_send:" . $sendResult); } } catch (\Exception $e) { throw new \Exception("pulsar_send_exception:" . $e->getMessage()); } } /** * 消费者 * @param callable $func * @throws \Exception */ public function consumerMsg(callable $func) { $consumerClient = $this->consumerClient(); if ($consumerClient == null) { throw new \Exception("pulsar_consumerMsg_exception:" . json_encode($this->getConf())); } while (true) { try { $result = $consumerClient->receive(); if($result==null){ continue; } Log::channel('pulsar-his')->info("pulsar_receive",["receive_time"=>Carbon::now()->toDateTimeString(),"receive_data"=>$result]); $resultArray = json_decode($result, true); $functionResult = call_user_func($func, $resultArray); if ($functionResult) { $ackMsg = ['messageId' => $resultArray['messageId']]; $consumerClient->text(json_encode($ackMsg)); } } catch (\Throwable $e) { //连接超时 默认5S连接超时 if($e->getCode()==1024){ //echo $e->getMessage().PHP_EOL; continue; } Log::channel('pulsar-his')->error("解析消费异常:",[$e->getMessage(),$e->getCode(),$e->getLine()]); exit; } } }}
存在问题
- 没有统一的 composer php 包依赖
- 封装比较粗糙
- 没有使用文档及 examples
- 错误处理较弱
- 稳定和可靠性待实际验证
最佳实践 RoadMap
- 开发自有 pulsar 的 websocket 通讯方式的 PHP composer依赖包。
- composer包要实现主要的 pulsar API 。包括但不限于客户端链接、断开、生产者发送消息、消费者读取消费消息、消费者回执等。保证客户端的稳定,不能阻塞后续程序执行,不能有内存泄漏,不能导致程序非正常退出,要正确处理pulsar server端的错误。
- 依赖包最好适配框架 Laravel.
- Topic 命名要能区分
- 业务方向或业务分组
- 以小写字母加数字以下划线_和.命名
- 消息体大小尽量不要超过 1MB
- 确认消费完成后,消费者需要回执 Ack
- 对于重复消费行为,消费者需要针对业务做幂等或业务逻辑要求的处理
- 生产和消费可溯源,可追踪,有日志留存
- 针对消费失败和其他故障的可重放
- 管理平台或面板展示
监控 && 告警指标
- 各Topic producer 和 consumer 的连接情况包括
- 正在活跃
- 历史连接
- 消息积压
- 消费速度
- producer 和 consumer 无存活告警、队列消息积压条数条件告警
- pulsar 本身告警(pulsar提供)
- cpu
- 内存
- 心跳
- 存储