Queue使用

定义一个队列

  1. namespace App\Utility;
  2. use EasySwoole\Component\Singleton;
  3. use EasySwoole\Queue\Queue;
  4. class MyQueue extends Queue
  5. {
  6. use Singleton;
  7. }

定义消费进程

  1. namespace App\Utility;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Queue\Job;
  4. class QueueProcess extends AbstractProcess
  5. {
  6. protected function run($arg)
  7. {
  8. go(function (){
  9. MyQueue::getInstance()->consumer()->listen(function (Job $job){
  10. var_dump($job->toArray());
  11. });
  12. });
  13. }
  14. }

可以多进程,多协程消费

驱动注册

  1. namespace EasySwoole\EasySwoole;
  2. use App\Utility\MyQueue;
  3. use App\Utility\QueueProcess;
  4. use EasySwoole\Component\Timer;
  5. use EasySwoole\EasySwoole\Swoole\EventRegister;
  6. use EasySwoole\EasySwoole\AbstractInterface\Event;
  7. use EasySwoole\Http\Request;
  8. use EasySwoole\Http\Response;
  9. use EasySwoole\Queue\Driver\Redis;
  10. use EasySwoole\Queue\Job;
  11. use EasySwoole\Redis\Config\RedisConfig;
  12. use EasySwoole\RedisPool\RedisPool;
  13. use EasySwoole\Utility\Time;
  14. class EasySwooleEvent implements Event
  15. {
  16. public static function initialize()
  17. {
  18. // TODO: Implement initialize() method.
  19. date_default_timezone_set('Asia/Shanghai');
  20. }
  21. public static function mainServerCreate(EventRegister $register)
  22. {
  23. //redis pool使用请看redis 章节文档
  24. $config = new RedisConfig([
  25. 'host'=>'127.0.0.1'
  26. ]);
  27. $redis = new RedisPool($config);
  28. $driver = new Redis($redis);
  29. MyQueue::getInstance($driver);
  30. //注册一个消费进程
  31. ServerManager::getInstance()->addProcess(new QueueProcess());
  32. //模拟生产者,可以在任意位置投递
  33. $register->add($register::onWorkerStart,function ($ser,$id){
  34. if($id == 0){
  35. Timer::getInstance()->loop(3000,function (){
  36. $job = new Job();
  37. $job->setJobData(['time'=>\time()]);
  38. MyQueue::getInstance()->producer()->push($job);
  39. });
  40. }
  41. });
  42. }
  43. public static function onRequest(Request $request, Response $response): bool
  44. {
  45. // TODO: Implement onRequest() method.
  46. return true;
  47. }
  48. public static function afterRequest(Request $request, Response $response): void
  49. {
  50. // TODO: Implement afterAction() method.
  51. }
  52. }