kafka

本项目代码参考自 https://github.com/weiboad/kafka-php

安装

  1. composer require easyswoole/kafka

注册kafka服务

  1. namespace EasySwoole\EasySwoole;
  2. use App\Producer\Process as ProducerProcess;
  3. use App\Consumer\Process as ConsumerProcess;
  4. use EasySwoole\EasySwoole\Swoole\EventRegister;
  5. use EasySwoole\EasySwoole\AbstractInterface\Event;
  6. use EasySwoole\Http\Request;
  7. use EasySwoole\Http\Response;
  8. class EasySwooleEvent implements Event
  9. {
  10. public static function initialize()
  11. {
  12. // TODO: Implement initialize() method.
  13. date_default_timezone_set('Asia/Shanghai');
  14. }
  15. public static function mainServerCreate(EventRegister $register)
  16. {
  17. // TODO: Implement mainServerCreate() method.
  18. // 生产者
  19. ServerManager::getInstance()->getSwooleServer()->addProcess((new ProducerProcess())->getProcess());
  20. // 消费者
  21. ServerManager::getInstance()->getSwooleServer()->addProcess((new ConsumerProcess())->getProcess());
  22. }
  23. ......
  24. }

生产者

  1. namespace App\Producer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ProducerConfig;
  4. use EasySwoole\Kafka\kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ProducerConfig();
  11. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  12. $config->setBrokerVersion('0.9.0');
  13. $config->setRequiredAck(1);
  14. $kafka = new kafka($config);
  15. $result = $kafka->producer()->send([
  16. [
  17. 'topic' => 'test',
  18. 'value' => 'message--',
  19. 'key' => 'key--',
  20. ],
  21. ]);
  22. var_dump($result);
  23. var_dump('ok');
  24. });
  25. }
  26. }

消费者

  1. namespace App\Consumer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ConsumerConfig;
  4. use EasySwoole\Kafka\kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ConsumerConfig();
  11. $config->setRefreshIntervalMs(1000);
  12. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  13. $config->setBrokerVersion('0.9.0');
  14. $config->setGroupId('test');
  15. $config->setTopics(['test']);
  16. $config->setOffsetReset('earliest');
  17. $kafka = new kafka($config);
  18. // 设置消费回调
  19. $func = function ($topic, $partition, $message) {
  20. var_dump($topic);
  21. var_dump($partition);
  22. var_dump($message);
  23. };
  24. $kafka->consumer()->subscribe($func);
  25. });
  26. }
  27. }

附赠

  1. Kafka 集群部署 docker-compose.yml 一份,使用方式如下
    1. 保证2181,9092,9093,9000端口未被占用(占用后可以修改compose文件中的端口号)
    2. 根目录下,docker-compose up -d
    3. 访问localhost:9000,可以查看kafka集群状态。

Any Question

kafka使用问题及bug,欢迎到Easyswoole的kaka群中提问或反馈 QQ群号:827432930