现状
- 没有原生PHP pulsar extension
- pulsar上开启了 KOP(Kafka On Pulsar)或 WebSocket 协议
- 客户端使用 rdkafka的扩展或 websocket 链接
实际使用情况
- saas-project 使用 kop。实际是 rdkafka 扩展。生产者发送消息时 kop 协议会对消息体做一定的处理。(base64 encode)
- his-api 使用 websocket方式。生产者发送消息时 ws 协议会对消息体做一定的处理。(base64 encode)。使用 textalk/websocket 作为管理 websocket 生命周期的依赖。新增了一个 Utils 类在项目中。
<?php
namespace App\Utils;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Log;
use WebSocket\Client;
/**
* pulsar web socket
* Class PulsarUtils
* @package App\Utils
* @resource https://pulsar.apache.org/docs/en/client-libraries-websocket/
* @example
* PulsarUtils::getInstance()->setConf([
* "url" => "111.111.0.60:8080",
* "tenant"=>"socket",
* "namespace"=>"socket_namespace",
* "topic"=>"socket_topic",
* "subname"=>"ctz_test",
* ])->consumerMsg(function ($msgReceive){
* echo base64_decode($msgReceive['payload']).PHP_EOL;
* return true;
* });
*
*/
final class PulsarUtils
{
protected $pulsarConf = [];
private static $_instance = NULL;
public static function getInstance()
{
if (is_null(self::$_instance)) {
self::$_instance = new self();
}
return self::$_instance;
}
public function checkConfig($configArray = [])
{
if (!array_key_exists("url", $configArray)) {
return false;
}
if (!array_key_exists("tenant", $configArray)) {
return false;
}
if (!array_key_exists("namespace", $configArray)) {
return false;
}
if (!array_key_exists("topic", $configArray)) {
return false;
}
return true;
}
/**
* @param $pulsarConfig pulsar配置
* @return $this
*/
public function setConf($pulsarConfig)
{
if ($this->checkConfig($pulsarConfig)) {
$this->pulsarConf = $pulsarConfig;
}
return $this;
}
/**
* 获取配置
* @return array
*/
public function getConf()
{
return $this->pulsarConf;
}
/**
* 生产者 pulsar ws链接
* @param $config
* @return string
*/
protected function getProducerWsUrl()
{
if ($this->checkConfig($this->pulsarConf)) {
return sprintf(
"ws://%s/ws/v2/producer/persistent/%s/%s/%s",
$this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic']
);
}
return "";
}
/**
* 消费者 pulsar ws链接
* @return string
*/
protected function getConsumerWSUrl()
{
if ($this->checkConfig($this->pulsarConf)) {
return sprintf(
"ws://%s/ws/v2/consumer/persistent/%s/%s/%s/%s?subscriptionType=Shared",
$this->pulsarConf['url'], $this->pulsarConf['tenant'], $this->pulsarConf['namespace'], $this->pulsarConf['topic'], $this->pulsarConf['subname'] ?? ""
);
}
return "";
}
/**
* 生产者端
* @param $config
* @return null|Client
*/
protected function producerClient()
{
$wsUrl = $this->getProducerWsUrl($this->pulsarConf);
static $pulsarProducerClients = [];
if ($wsUrl) {
$wsUrlHash = md5($wsUrl);
if (isset($pulsarProducerClients[$wsUrlHash])) {
return $pulsarProducerClients[$wsUrlHash];
} else {
$pulsarProducerClients[$wsUrlHash] = new Client($wsUrl);
return $pulsarProducerClients[$wsUrlHash];
}
}
return null;
}
/**
* @param $config
* @return Client
* @throws \Exception
*/
protected function consumerClient()
{
$wsUrl = $this->getConsumerWSUrl($this->pulsarConf);
static $pulsarConsumerClients = [];
if ($wsUrl) {
$wsUrlHash = md5($wsUrl);
if (isset($pulsarConsumerClients[$wsUrlHash])) {
return $pulsarConsumerClients[$wsUrlHash];
} else {
$pulsarConsumerClients[$wsUrlHash] = new Client($wsUrl,["timeout"=>10]);
return $pulsarConsumerClients[$wsUrlHash];
}
}
return null;
}
/**
* 生产者
* @param $message
* @return bool
* @throws \Exception
*/
public function producerMsg($message)
{
$producerClient = $this->producerClient();
if ($producerClient == null) {
throw new \Exception("pulsar_producerMsg_exception:" . json_encode($this->getConf()));
}
//判断是否是数组类型
if (is_array($message)) {
$message = json_encode($message, JSON_UNESCAPED_UNICODE);
}
//发送前需要base64
$base64Message = base64_encode($message);
$prepareSendMessage = [
"payload" => $base64Message,
"properties" => new \stdClass(),
"context" => "",
"key" => "",
];
$sendMessage = json_encode($prepareSendMessage);
$producerClient->text($sendMessage);
$sendResult = $producerClient->receive();
try {
$result = json_decode($sendResult, true);
if (array_key_exists("result", $result) &&
$result['result'] == "ok") {
return true;
} else {
throw new \Exception("pulsar_websocket_send:" .
$sendResult);
}
} catch (\Exception $e) {
throw new \Exception("pulsar_send_exception:" . $e->getMessage());
}
}
/**
* 消费者
* @param callable $func
* @throws \Exception
*/
public function consumerMsg(callable $func)
{
$consumerClient = $this->consumerClient();
if ($consumerClient == null) {
throw new \Exception("pulsar_consumerMsg_exception:" . json_encode($this->getConf()));
}
while (true) {
try {
$result = $consumerClient->receive();
if($result==null){
continue;
}
Log::channel('pulsar-his')->info("pulsar_receive",["receive_time"=>Carbon::now()->toDateTimeString(),"receive_data"=>$result]);
$resultArray = json_decode($result, true);
$functionResult = call_user_func($func, $resultArray);
if ($functionResult) {
$ackMsg = ['messageId' => $resultArray['messageId']];
$consumerClient->text(json_encode($ackMsg));
}
} catch (\Throwable $e) {
//连接超时 默认5S连接超时
if($e->getCode()==1024){
//echo $e->getMessage().PHP_EOL;
continue;
}
Log::channel('pulsar-his')->error("解析消费异常:",[$e->getMessage(),$e->getCode(),$e->getLine()]);
exit;
}
}
}
}
存在问题
- 没有统一的 composer php 包依赖
- 封装比较粗糙
- 没有使用文档及 examples
- 错误处理较弱
- 稳定和可靠性待实际验证
最佳实践 RoadMap
- 开发自有 pulsar 的 websocket 通讯方式的 PHP composer依赖包。
- composer包要实现主要的 pulsar API 。包括但不限于客户端链接、断开、生产者发送消息、消费者读取消费消息、消费者回执等。保证客户端的稳定,不能阻塞后续程序执行,不能有内存泄漏,不能导致程序非正常退出,要正确处理pulsar server端的错误。
- 依赖包最好适配框架 Laravel.
- Topic 命名要能区分
- 业务方向或业务分组
- 以小写字母加数字以下划线_和.命名
- 消息体大小尽量不要超过 1MB
- 确认消费完成后,消费者需要回执 Ack
- 对于重复消费行为,消费者需要针对业务做幂等或业务逻辑要求的处理
- 生产和消费可溯源,可追踪,有日志留存
- 针对消费失败和其他故障的可重放
- 管理平台或面板展示
监控 && 告警指标
- 各Topic producer 和 consumer 的连接情况包括
- 正在活跃
- 历史连接
- 消息积压
- 消费速度
- producer 和 consumer 无存活告警、队列消息积压条数条件告警
- pulsar 本身告警(pulsar提供)
- cpu
- 内存
- 心跳
- 存储