本文基于tp5.1实现,实际项目中使用的话需配合定时任务开启队列
一、确认是否composer安装了队列插件
1、首先我们看一下自己的TP5的框架中的 TP5\vendor\topthink ,这个文件中有没有think-queue这个文件夹,如果没有请进行composer安装:
composer require topthink/think-queue
2、确认是否成功安装
php think queue:work -h
二、配置queue
在application/config/queue.php中进行队列的配置:
return ['connector' => 'Redis', // Redis 驱动'expire' => 60, //任务的过期时间,默认为60秒; 若要禁用,则设置为 null'default' => 'default', // 默认的队列名称'host' => '127.0.0.1', // redis 主机ip'port' => 6379, // redis 端口'password' => '', // redis 密码'select' => 0, // 使用哪一个 db,默认为 db0'timeout' => 0, // redis连接的超时时间'persistent' => false, // 是否是长连接];
三、代码实现
1、在application下的模块内新建job文件夹,文件夹下建立队列名.php文件
如下面的hello队列任务
<?phpnamespace app\index\job;use app\index\model\User;use think\queue\Job;class Hello{/*** * fire方法是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据* @throws \Exception*/public function fire(Job $job, $data){$isJobDone = $this->doHelloJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();print("<info>Hello Job has been done and deleted" . "</info>\n");} else {if ($job->attempts() > 3) {//通过这个方法可以检查这个任务已经重试了几次了print("<warn>Hello Job has been retried more than 3 times!" . "</warn>\n");$job->delete();// 也可以重新发布这个任务//print("<info>Hello Job will be availabe again after 2s."."</info>\n");//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 任务实现的主要功能:我的例子是(更新通知时间)* @param $data* @return bool* @throws \Exception*/private function doHelloJob($data){$user = new User();$user->saveAll($data);return true;}/*** 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员* @param $jobData string|array|... //发布任务时传递的 jobData 数据*/public function failed($jobData){print("Warning: Job failed after max retries. job data is :".var_export($jobData,true))."\n";}
2、在业务控制器中调用推送任务hello
<?phpnamespace app\index\controller;use app\index\model\User;use think\Queue;class Job{/*** 一个使用了队列的 action*/public function helloJob(){// 1.当前任务将由哪个类来负责处理。// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法$jobHandlerClassName = 'app\index\job\Hello';// 2.当前任务归属的队列名称,如果为新队列,会自动创建$jobQueueName = "helloJobQueue";// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串// ( jobData 为对象时,存储其public属性的键值对 )$userData = User::select();$jobData=[];foreach ($userData as $v) {$jobData[] = ['id' => $v['id'],'notify_time' => time()];}// 4.将该任务推送到消息队列,等待对应的消费者去执行$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|falseif ($isPushed !== false) {echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";} else {echo 'Oops, something went wrong.';}}
四、补充:对队列任务失败的回调处理
在application/tags.php文件中的return数组中添加
// 任务失败统一回调,有四种定义方式'queue_failed'=> [// 数组形式,[ 'ClassName' , 'methodName']['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']// 字符串(静态方法),'StaicClassName::methodName'// 'MyQueueFailedLogger::logAllFailedQueues'// 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法// 'application\\behavior\\MyQueueFailedLogger'// 闭包形式/*function( &$jobObject , $extra){// var_dump($jobObject);return true;}*/]
如我们采用方式1:就在application下新建behavior文件夹,创建MyQueueFailedLogger.php控制器
<?phpnamespace app\behavior;class MyQueueFailedLogger{const should_run_hook_callback = true;/*** @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据* @return bool true //是否需要删除任务并触发其failed() 方法*/public function logAllFailedQueues(&$jobObject){$failedJobLog = ['jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello''queueName' => $jobObject->getQueue(), //'helloJobQueue''jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }''attempts' => $jobObject->attempts(),// 3];var_export(json_encode($failedJobLog,true));// $jobObject->release(); //重发任务//$jobObject->delete(); //删除任务//$jobObject->failed(); //通知消费者类任务执行失败return self::should_run_hook_callback;}
