1 php拓展所在位置

可以到php.ini中查看ext的存放位置

php/ext文件中存放的后缀为dll(动态链接库)的文件为php的拓展组件。

2 查看phpinfo

  1. php -i // 输出php信息,等同于phpinfo
  2. php -r 'phpinfo();' // 命令行执行code

使用phpinfo()函数查看PHP的版本信息,这会决定扩展文件版本

php安装拓展 - 图1

其中,第一个是VC的版本为11,第二个是64位的,第三个参数api版本号,安装的组件是TS(即线程安全)的,VC版本为11

3 安装redis

3.1 找对应的redis.dll和igbinary.dll

igbinary是安装redis的前提即依赖

在mac上也是需要igbinary的,linux这类的操作系统可以通过brew安装php,自带pecl,使用pecl安装拓展,它是php第三方拓展库

dll即动态链接库,是C语言编写的,已经编译好之后的库。而在linux上,多采用二进制源码编译的方式,得到so后缀的文件

http://windows.php.net/downloads/pecl/releases/redis/2.2.7/

php安装拓展 - 图2

http://windows.php.net/downloads/pecl/releases/igbinary/2.0.1/

php安装拓展 - 图3

3.2 修改php.ini

  1. [Redis]
  2. ;php_redis
  3. extension=php_igbinary.dll
  4. extension=php_redis.dll

注意:extension=php_igbinary.dll要放在extension=php_redis.dll前,否则不会生效。

在mac中,还需要重启php-fpm,有时候它会自动平滑重启,有时候需要自己重启。

  1. ps aux|grep php-fpm // 查看进程号
  2. kill -USR2 php-fpm.pid的绝对路径或者PID
  3. php --ri redis // 查看拓展配置信息以便确定重启成功
  4. php -m // 查看所有的拓展

3.3 纠错

出现

  1. Unable to load dynamic library 'f:\php\ext\php_redis.dll' - 找不到指定的模块。

或者

  1. Gracefully shutting down php-fpm . done
  2. Starting php-fpm [16-Jan-2014 12:46:26] NOTICE: PHP message: PHP Warning: PHP Startup: memcache: Unable to initialize module
  3. Module compiled with module API=20090626
  4. PHP compiled with module API=20100525
  5. These options need to match
  6. in Unknown on line 0
  7. done

即可能是下载的时候疏忽了,没用下载到对应php版本的拓展,即版本不匹配,重新下载吧。

4 安装kafka拓展

安装kafka集群:https://www.yuque.com/u21438008/nd8ycn/urfi5m
php手册介绍:php需在7以上。https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.requirements.html
函数:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/index.html
安装方式:
https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.installation.windows.html
image.png

  1. <?php
  2. class KafkaHelper {
  3. private $topicName = '';
  4. private $ipAddress = '';
  5. private $groupName = '';
  6. // 生产者配置
  7. private $conf = null;
  8. // 生产者topic配置(感觉抽离出来是多topic)
  9. private $topicConf = null;
  10. // 生产者
  11. private $producer = null;
  12. // 发送操作对象(感觉抽离出来是多topic)
  13. private $producerTopic = null;
  14. //
  15. // 消费者配置
  16. private $consumerConf = null;
  17. // 消费者topic 配置
  18. private $consumerTopicConf = null;
  19. // 消费者
  20. private $consumer = null;
  21. // 消费操作对象
  22. private $consumerTopic = null;
  23. // 初始话设置相关配置信息
  24. public function __construct($topicName = '', $ipAddress = '', $groupName = '')
  25. {
  26. $this->topicName = $topicName;
  27. echo "topic_name:" . $topicName . "\n";
  28. $this->ipAddress = $ipAddress;
  29. echo "ipaddress:" . $ipAddress . "\n";
  30. $this->groupName = $groupName;
  31. echo "groupName:" . $groupName . "\n";
  32. }
  33. // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  34. public function setConf($conf = null)
  35. {
  36. if (!$conf) {
  37. $conf = new \RdKafka\Conf();
  38. $conf->set('log_level', (string) LOG_DEBUG);
  39. $conf->set('metadata.broker.list', $this->ipAddress);
  40. $conf->setDrMsgCb(function ($kafka, $msg) {
  41. if ($msg->err) {
  42. // message permanently failed to be delivered
  43. echo "produce failed:" . json_encode($msg) . "\n";
  44. } else {
  45. // message successfully delivered
  46. echo "produce succeed:" . json_encode($msg) . "\n";
  47. }
  48. });
  49. $this->conf = $conf;
  50. } else {
  51. if (!($conf instanceof \RdKafka\Conf)) {
  52. throw new \Exception('传入的配置类应为: \Rdkafka\conf');
  53. }
  54. $this->conf = $conf;
  55. }
  56. return $this;
  57. }
  58. public function setTopicConf($topicConf = null)
  59. {
  60. if (!$topicConf) {
  61. $topicConf = new \Rdkafka\TopicConf();
  62. $topicConf->set('request.required.acks', 0);
  63. $this->topicConf = $topicConf;
  64. } else {
  65. if (!($topicConf instanceof \Rdkafka\TopicConf)) {
  66. throw new \Exception('传入的配置类应为: \Rdkafka\TopicConf');
  67. }
  68. $this->topicConf = $topicConf;
  69. }
  70. return $this;
  71. }
  72. public function createProducer()
  73. {
  74. $this->producer = new \RdKafka\Producer($this->conf);
  75. // $this->producer->addBrokers($this->ipAddress);
  76. $this->producerTopic = $this->producer->newTopic($this->topicName, $this->topicConf);
  77. return $this;
  78. }
  79. // 二维数组
  80. public function send($data = [])
  81. {
  82. if (empty($data)) {
  83. return '数据不能为空';
  84. }
  85. echo "sending:". json_encode($data) ."\n";
  86. try {
  87. foreach ($data as $key => $val) {
  88. echo "$key : ". json_encode($val) ."\n";
  89. $this->producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($val, JSON_UNESCAPED_UNICODE));
  90. // LogHelper::log('发送kafka数据:'.json_encode($val, JSON_UNESCAPED_UNICODE), 'kafka_producer');
  91. }
  92. $times = 0;
  93. while ($this->producer->getOutQLen() > 0) {
  94. // LogHelper::log($this->rk->getOutQLen(), 'kafka_producer');
  95. if ($times++ > 10) {
  96. // LogHelper::log('投递kafka超时:','kafka_producer');
  97. throw new \Exception('消息超时');
  98. }
  99. $this->producer->poll(300);
  100. }
  101. echo "end\n";
  102. return true;
  103. } catch (\Exception $e) {
  104. echo 'errormsg:' . $e->getMessage() . "\n";
  105. // LogHelper::log('投递kafka异常:'.$e->getMessage(),'kafka_producer');
  106. return false;
  107. }
  108. }
  109. // create consumerConf. You should call ConsumerTopicConf() first.
  110. public function setConsumerConf($consumerConf = null)
  111. {
  112. echo "set consumer conf\n";
  113. if (!$consumerConf) {
  114. $consumerConf = new \RdKafka\Conf();
  115. // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
  116. $consumerConf->set('group.id', $this->groupName);
  117. //添加 kafka集群服务器地址
  118. $consumerConf->set('metadata.broker.list', $this->ipAddress);
  119. $consumerConf->setDefaultTopicConf($this->consumerTopicConf);
  120. // 消费成功的回调
  121. $consumerConf->setConsumeCb(function ($msg) {
  122. echo "consume succeed:". json_encode($msg) ."\n";
  123. });
  124. $this->consumerConf = $consumerConf;
  125. } else {
  126. if (!($consumerConf instanceof \RdKafka\Conf)) {
  127. throw new \Exception('传入的配置类应为: \Rdkafka\Conf');
  128. }
  129. $this->consumerConf = $consumerConf;
  130. }
  131. return $this;
  132. }
  133. // 设置消ConsumerTopicConf,无需其他配合
  134. public function setConsumerTopicConf($consumerTopicConf = null)
  135. {
  136. echo "set consumer topic conf\n";
  137. if (!$consumerTopicConf) {
  138. $consumerTopicConf = new \RdKafka\TopicConf();
  139. ////当没有初始偏移量时,从哪里开始读取 'smallest': start from the beginning 'latest' 读取最新的数据
  140. $consumerTopicConf->set('auto.offset.reset', 'smallest');
  141. //设置自动记录上次读取到的位置
  142. //$topicConf->set('auto.commit.enable', true);
  143. // Set the configuration to use for subscribed/assigned topics
  144. $this->consumerTopicConf = $consumerTopicConf;
  145. } else {
  146. if (!($consumerTopicConf instanceof \Rdkafka\TopicConf)) {
  147. throw new \Exception('传入的配置类应为: \Rdkafka\TopicConf');
  148. }
  149. $this->consumerTopicConf = $consumerTopicConf;
  150. }
  151. return $this;
  152. }
  153. public function createConsumer()
  154. {
  155. echo "create consumer \n";
  156. // 使用RdkafkaConsumer()貌似就能自动消费所有Partition;Consumer()就需要指定哪个Partition,然后其他的Partition消费不到
  157. $consumer = new \RdKafka\KafkaConsumer($this->consumerConf);
  158. // $consumer = new \RdKafka\Consumer($this->consumerConf);
  159. $this->consumer = $consumer;
  160. // 让消费者订阅log 主题
  161. $consumer->subscribe([$this->topicName]);
  162. // $this->consumTopic = $this->consumer->newTopic($this->topicName, $this->consumerTopicConf);
  163. return $this;
  164. }
  165. public function startConsume()
  166. {
  167. echo "start consumer---------" . "\n";
  168. // $consumerTopic = $this->consumTopic;
  169. $consumer = $this->consumer;
  170. try {
  171. //解决 kafka在遇到PHP fatalerrorr不退出的问题。
  172. $handleShutdown = function () use ($consumer) {
  173. $consumer->unsubscribe();
  174. };
  175. register_shutdown_function($handleShutdown);
  176. } catch (\Exception $e) {
  177. $consumer && $consumer->unsubscribe();
  178. }
  179. // 消费
  180. // $consumerTopic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  181. while (true) {
  182. try {
  183. // 原参数1000
  184. // $message = $consumerTopic->consume(0, 120*10000);
  185. $message = $consumer->consume(1000);
  186. // 返回消费结果
  187. switch ($message->err) {
  188. // 正常消费
  189. case RD_KAFKA_RESP_ERR_NO_ERROR:
  190. // LogHelper::log('message:kafka信息:' . json_encode($message), $this->logFile);
  191. // self::stdOutLineF('message:kafka信息:' . json_encode($message));
  192. if (!is_null($message)) {
  193. $data = json_decode($message->payload,true);
  194. $this->handleData($data); // 处理数据
  195. }
  196. break;
  197. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  198. // LogHelper::log($message->err . " | No more messages; will wait for more", $this->logFile);
  199. // self::stdOutLineF($message->err . " | No more messages; will wait for more");
  200. break;
  201. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  202. break;
  203. default:
  204. throw new \Exception($message->errstr(), $message->err);
  205. break;
  206. }
  207. } catch (\Exception $e) {
  208. // self::stdOutLineF("Kafka Stop Error:".$e->getMessage());
  209. // LogHelper::log('Kafka Stop Error:' . $e->getMessage(), $this->logFile);
  210. $consumer->unsubscribe();
  211. // $consumerTopic->consumeStop(0, RD_KAFKA_OFFSET_STORED);
  212. echo "出错了\n";
  213. }
  214. }
  215. }
  216. // json_decode后的数据
  217. public function handleData($data)
  218. {
  219. echo json_encode($data) . "\n";
  220. }
  221. }
  222. $ipAddress = '192.168.184.129:9093';
  223. $topicName = 'TestTopic';
  224. $group = 'test';
  225. $kafka = new KafKaHelper($topicName, $ipAddress, $group);
  226. // $data = [['22' => 'test']];
  227. // $kafka->setConf()->setTopicConf()->createProducer()->send($data);
  228. //
  229. $kafka->setConsumerTopicConf()->setConsumerConf()->createConsumer()->startConsume();

image.png