QueueDrive

We can customize the driver to implement the packaging of consumer queue software such as RabbitMQ.

Define the class and inherit the EasySwoole\Queue\QueueDriverInterface interface to implement several methods.

QueueDriverInterface

  1. namespace EasySwoole\Queue;
  2. interface QueueDriverInterface
  3. {
  4. public function push(Job $job):bool ;
  5. public function pop(float $timeout = 3.0):?Job;
  6. public function size():?int ;
  7. }

Comes with redis drive

  1. namespace EasySwoole\Queue\Driver;
  2. use EasySwoole\Queue\Job;
  3. use EasySwoole\Queue\QueueDriverInterface;
  4. use EasySwoole\Redis\Redis as Connection;
  5. use EasySwoole\RedisPool\RedisPool;
  6. class Redis implements QueueDriverInterface
  7. {
  8. protected $pool;
  9. protected $queueName;
  10. public function __construct(RedisPool $pool,string $queueName = 'EasySwoole')
  11. {
  12. $this->pool = $pool;
  13. $this->queueName = $queueName;
  14. }
  15. public function push(Job $job): bool
  16. {
  17. $data = $job->__toString();
  18. return $this->pool->invoke(function (Connection $connection)use($data){
  19. return $connection->lPush($this->queueName,$data);
  20. });
  21. }
  22. public function pop(float $timeout = 3.0): ?Job
  23. {
  24. return $this->pool->invoke(function (Connection $connection){
  25. $data = json_decode($connection->rPop($this->queueName),true);
  26. if(is_array($data)){
  27. return new Job($data);
  28. }else{
  29. return null;
  30. }
  31. });
  32. }
  33. public function size(): ?int
  34. {
  35. return $this->pool->invoke(function (Connection $connection){
  36. return $connection->lLen($this->queueName);
  37. });
  38. }
  39. }