Queue驱动

我们可以自定义驱动,实现RabbitMQ等消费队列软件的封装。

定义类,并继承EasySwoole\Queue\QueueDriverInterface接口,实现几个方法即可。

QueueDriverInterface

  1. namespace EasySwoole\Queue;
  2. interface QueueDriverInterface
  3. {
  4. public function push(Job $job):bool ;
  5. public function pop(float $timeout = 3.0):?Job;
  6. public function size():?int ;
  7. }

自带的redis驱动

  1. namespace EasySwoole\Queue\Driver;
  2. use EasySwoole\Queue\Job;
  3. use EasySwoole\Queue\QueueDriverInterface;
  4. use EasySwoole\Redis\Redis as Connection;
  5. use EasySwoole\RedisPool\RedisPool;
  6. class Redis implements QueueDriverInterface
  7. {
  8. protected $pool;
  9. protected $queueName;
  10. public function __construct(RedisPool $pool,string $queueName = 'EasySwoole')
  11. {
  12. $this->pool = $pool;
  13. $this->queueName = $queueName;
  14. }
  15. public function push(Job $job): bool
  16. {
  17. $data = $job->__toString();
  18. return $this->pool->invoke(function (Connection $connection)use($data){
  19. return $connection->lPush($this->queueName,$data);
  20. });
  21. }
  22. public function pop(float $timeout = 3.0): ?Job
  23. {
  24. return $this->pool->invoke(function (Connection $connection){
  25. $data = json_decode($connection->rPop($this->queueName),true);
  26. if(is_array($data)){
  27. return new Job($data);
  28. }else{
  29. return null;
  30. }
  31. });
  32. }
  33. public function size(): ?int
  34. {
  35. return $this->pool->invoke(function (Connection $connection){
  36. return $connection->lLen($this->queueName);
  37. });
  38. }
  39. }