kafka

This project code is referenced from https://github.com/weiboad/kafka-php

Installation

  1. composer required easyswoole/kafka

Register kafka service

  1. namespace EasySwoole\EasySwoole;
  2. use App\Producer\Process as ProducerProcess;
  3. use App\Consumer\Process as ConsumerProcess;
  4. use EasySwoole\EasySwoole\Swoole\EventRegister;
  5. use EasySwoole\EasySwoole\AbstractInterface\Event;
  6. use EasySwoole\Http\Request;
  7. use EasySwoole\Http\Response;
  8. class EasySwooleEvent implements Event
  9. {
  10. public static function initialize()
  11. {
  12. // TODO: Implement initialize() method.
  13. date_default_timezone_set('Asia/Shanghai');
  14. }
  15. public static function mainServerCreate(EventRegister $register)
  16. {
  17. // TODO: Implement mainServerCreate() method.
  18. // Producer
  19. ServerManager::getInstance()->getSwooleServer()->addProcess((new ProducerProcess())->getProcess());
  20. // consumer
  21. ServerManager::getInstance()->getSwooleServer()->addProcess((new ConsumerProcess())->getProcess());
  22. }
  23. ......
  24. }

Producer

  1. namespace App\Producer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ProducerConfig;
  4. use EasySwoole\Kafka\kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ProducerConfig();
  11. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  12. $config->setBrokerVersion('0.9.0');
  13. $config->setRequiredAck(1);
  14. $kafka = new kafka($config);
  15. $result = $kafka->producer()->send([
  16. [
  17. 'topic' => 'test',
  18. 'value' => 'message--',
  19. 'key' => 'key--',
  20. ],
  21. ]);
  22. var_dump($result);
  23. var_dump('ok');
  24. });
  25. }
  26. }

consumer

  1. namespace App\Consumer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ConsumerConfig;
  4. use EasySwoole\Kafka\kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ConsumerConfig();
  11. $config->setRefreshIntervalMs(1000);
  12. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  13. $config->setBrokerVersion('0.9.0');
  14. $config->setGroupId('test');
  15. $config->setTopics(['test']);
  16. $config->setOffsetReset('earliest');
  17. $kafka = new kafka($config);
  18. // Set consumption callback
  19. $func = function ($topic, $partition, $message) {
  20. var_dump($topic);
  21. var_dump($partition);
  22. var_dump($message);
  23. };
  24. $kafka->consumer()->subscribe($func);
  25. });
  26. }
  27. }

Bonus

  1. Kafka cluster deployment docker-compose.yml one, use as follows
    1. Ensure that the ports 2181, 9092, 9093, and 9000 are not occupied (you can modify the port number in the compose file after occupying)
    2. Under the root directory, docker-compose up -d
    3. Visit localhost:9000 to view the kafka cluster status.

any Question

Kafka use questions and bugs, welcome to questions or feedback in the kaka group of Easyswoole QQ group number: 827432930