本文基于tp5.1实现,实际项目中使用的话需配合定时任务开启队列

一、确认是否composer安装了队列插件

1、首先我们看一下自己的TP5的框架中的 TP5\vendor\topthink ,这个文件中有没有think-queue这个文件夹,如果没有请进行composer安装:

  1. composer require topthink/think-queue

2、确认是否成功安装

  1. php think queue:work -h

二、配置queue

application/config/queue.php中进行队列的配置:

  1. return [
  2. 'connector' => 'Redis', // Redis 驱动
  3. 'expire' => 60, //任务的过期时间,默认为60秒; 若要禁用,则设置为 null
  4. 'default' => 'default', // 默认的队列名称
  5. 'host' => '127.0.0.1', // redis 主机ip
  6. 'port' => 6379, // redis 端口
  7. 'password' => '', // redis 密码
  8. 'select' => 0, // 使用哪一个 db,默认为 db0
  9. 'timeout' => 0, // redis连接的超时时间
  10. 'persistent' => false, // 是否是长连接
  11. ];

三、代码实现

1、在application下的模块内新建job文件夹,文件夹下建立队列名.php文件

如下面的hello队列任务

  1. <?php
  2. namespace app\index\job;
  3. use app\index\model\User;
  4. use think\queue\Job;
  5. class Hello
  6. {
  7. /**
  8. * * fire方法是消息队列默认调用的方法
  9. * @param Job $job 当前的任务对象
  10. * @param array|mixed $data 发布任务时自定义的数据
  11. * @throws \Exception
  12. */
  13. public function fire(Job $job, $data)
  14. {
  15. $isJobDone = $this->doHelloJob($data);
  16. if ($isJobDone) {
  17. // 如果任务执行成功,记得删除任务
  18. $job->delete();
  19. print("<info>Hello Job has been done and deleted" . "</info>\n");
  20. } else {
  21. if ($job->attempts() > 3) {
  22. //通过这个方法可以检查这个任务已经重试了几次了
  23. print("<warn>Hello Job has been retried more than 3 times!" . "</warn>\n");
  24. $job->delete();
  25. // 也可以重新发布这个任务
  26. //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
  27. //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
  28. }
  29. }
  30. }
  31. /**
  32. * 任务实现的主要功能:我的例子是(更新通知时间)
  33. * @param $data
  34. * @return bool
  35. * @throws \Exception
  36. */
  37. private function doHelloJob($data)
  38. {
  39. $user = new User();
  40. $user->saveAll($data);
  41. return true;
  42. }
  43. /**
  44. * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
  45. * @param $jobData string|array|... //发布任务时传递的 jobData 数据
  46. */
  47. public function failed($jobData){
  48. print("Warning: Job failed after max retries. job data is :".var_export($jobData,true))."\n";
  49. }

2、在业务控制器中调用推送任务hello

  1. <?php
  2. namespace app\index\controller;
  3. use app\index\model\User;
  4. use think\Queue;
  5. class Job
  6. {
  7. /**
  8. * 一个使用了队列的 action
  9. */
  10. public function helloJob()
  11. {
  12. // 1.当前任务将由哪个类来负责处理。
  13. // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
  14. $jobHandlerClassName = 'app\index\job\Hello';
  15. // 2.当前任务归属的队列名称,如果为新队列,会自动创建
  16. $jobQueueName = "helloJobQueue";
  17. // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
  18. // ( jobData 为对象时,存储其public属性的键值对 )
  19. $userData = User::select();
  20. $jobData=[];
  21. foreach ($userData as $v) {
  22. $jobData[] = ['id' => $v['id'],'notify_time' => time()];
  23. }
  24. // 4.将该任务推送到消息队列,等待对应的消费者去执行
  25. $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
  26. // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
  27. if ($isPushed !== false) {
  28. echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
  29. } else {
  30. echo 'Oops, something went wrong.';
  31. }
  32. }

四、补充:对队列任务失败的回调处理

application/tags.php文件中的return数组中添加

  1. // 任务失败统一回调,有四种定义方式
  2. 'queue_failed'=> [
  3. // 数组形式,[ 'ClassName' , 'methodName']
  4. ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
  5. // 字符串(静态方法),'StaicClassName::methodName'
  6. // 'MyQueueFailedLogger::logAllFailedQueues'
  7. // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
  8. // 'application\\behavior\\MyQueueFailedLogger'
  9. // 闭包形式
  10. /*
  11. function( &$jobObject , $extra){
  12. // var_dump($jobObject);
  13. return true;
  14. }
  15. */
  16. ]

如我们采用方式1:就在application下新建behavior文件夹,创建MyQueueFailedLogger.php控制器

  1. <?php
  2. namespace app\behavior;
  3. class MyQueueFailedLogger
  4. {
  5. const should_run_hook_callback = true;
  6. /**
  7. * @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据
  8. * @return bool true //是否需要删除任务并触发其failed() 方法
  9. */
  10. public function logAllFailedQueues(&$jobObject){
  11. $failedJobLog = [
  12. 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
  13. 'queueName' => $jobObject->getQueue(), //'helloJobQueue'
  14. 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
  15. 'attempts' => $jobObject->attempts(),// 3
  16. ];
  17. var_export(json_encode($failedJobLog,true));
  18. // $jobObject->release(); //重发任务
  19. //$jobObject->delete(); //删除任务
  20. //$jobObject->failed(); //通知消费者类任务执行失败
  21. return self::should_run_hook_callback;
  22. }