命令模式

  1. queue:work 命令 work 命令: 该命令将启动一个 work 进程来处理消息队列。php think queue:work —queue helloJobQueue

    1. php think queue:work --queue JobQueue
  2. queue:listen 命令listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。php think queue:listen —queue helloJobQueue

    php think queue:listen --queue JobQueue
    

命令行参数

Work 模式

php think queue:work \
--daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue  helloJobQueue  //要处理的队列的名称
--delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
--memory 128 \      //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

Listen 模式

php think queue:listen \
--queue  helloJobQueue \   //监听的队列的名称
--delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \       //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
--tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

work 模式和 listen 模式的区别

两者都可以用于处理消息队列中的任务
区别在于:

执行原理不同

  1. work 命令是单进程的处理模式。按照是否设置了 —daemon 参数,work命令又可分为单次执行和循环执行两种模式。

    1. 单次执行:不添加 —daemon参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出。

    2. 循环执行:添加了 —daemon参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间。

  2. listen 命令是 父进程 + 子进程 的处理模式。listen命令所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后,listen命令所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程

退出时机不同

  1. work 命令的退出时机在上面的执行原理部分已叙述,此处不再重复
  2. listen 命令中,listen所在的父进程正常情况会一直运行,除非遇到下面两种情况:

    1. 创建的某个work子进程的执行时间超过了 listen命令行中的—timeout 参数配置,此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出。开发者可以选择捕获该异常,让父进程继续执行,也可以选择通过 supervisor 等监控软件重启一个新的listen命令。

    2. listen 命令所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 —memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。

性能不同

  1. work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

  2. 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。因此: work 模式的性能会比listen模式高。注意:当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

超时控制能力不同

work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如你在某次上线之后,在上文中的 \application\index\job\Hello.php 消费者的fire方法中添加了一段死循环 :

public function fire(){
   while(true){ //死循环
       $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
       sleep(1);
   }
}

那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 helloJobQueue 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。

work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。

而 listen命令可以限制其创建的work子进程的超时时间。listen 命令可通过 —timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;

这里有必要补充一下 expire 和 timeout 之间的区别:

expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念:

expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的work子进程) 。expire 针对的对象是 任务。

timeout 是指work子进程的超时时间。这个时间只对当前执行的listen 命令有效。timeout 针对的对象是 work子进程。

使用场景不同

根据上面的介绍,可以看到,

  1. work 命令的适用场景是:

    1. 任务数量较多
    2. 性能要求较高
    3. 任务的执行时间较短
    4. 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
  2. listen命令的适用场景是:

    1. 任务数量较少
    2. 任务的执行时间较长(如生成大型的excel报表等),
    3. 任务的执行时间需要有严格限制

消息队列的开始,停止与重启

  1. 开始一个消息队列:

    php think queue:work
    
  2. 停止所有的消息队列:

    php think queue:restart
    
  3. 重启所有的消息队列:

    php think queue:restart 
    php think queue:work
    

多模块,多任务的处理

多模块

单模块项目推荐使用 app\job 作为任务类的命名空间
多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

多任务

如果一个任务类里有多个小任务的话,在发布任务时,需要用 任务的类名@方法名 如 app\lib\job\Job2@task1、app\lib\job\Job2@task2
注意:命令行中的 —queue 参数不支持@解析

多任务例子:

  1. 在 \application\index\controller\JobTest.php 控制器中,添加 actionWithMultiTask()方法:

    public function actionWithMultiTask(){
    
    $taskType = $_GET['taskType'];
     switch ($whichTask) {
        case 'taskA':
            $jobHandlerClassName  = 'application\index\job\MultiTask@taskA';
            $jobDataArr = ['a'    => '1'];
            $jobQueueName = "multiTaskJobQueue";    
            break;
        case 'taskB':
            $jobHandlerClassName  = 'application\index\job\MultiTask@taskB';
            $jobDataArr = ['b'    => '2'];
            $jobQueueName = "multiTaskJobQueue";        
            break;
         default:
            break;
    }
    
    $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
    if ($isPushed !== false) {
     echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
    }else{
     throw new Exception("push a new $taskType of MultiTask Job Failed!");
    }
    }
    
  2. 新增 \application\index\job\MultiTask.php 消费者类,并编写其 taskA() 和 taskB()方法 ```php <?php /**

    • 文件路径: \application\index\job\MultiTask.php
    • 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务 */ namespace application\index\job;

use think\queue\Job;

class MultiTask {

public function taskA(Job $job,$data){

    $isJobDone = $this->_doTaskA($data);

    if ($isJobDone) {
        $job->delete();
        print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
    }else{
        if ($job->attempts() > 3) {
            $job->delete();     
        }
    }
}

public function taskB(Job $job,$data){

    $isJobDone = $this->_doTaskA($data);

    if ($isJobDone) {
        $job->delete();
        print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
    }else{
        if ($job->attempts() > 2) {
            $job->release();     
        }
    }
}

private function _doTaskA($data) {
    print("Info: doing TaskA of Job MultiTask "."\n");
    return true;
}

private function _doTaskB($data) {
    print("Info: doing TaskB of Job MultiTask "."\n");
    return true;
}

}

<a name="AADj6"></a>
# 消息的延迟执行与定时执行

延迟执行,相对于即时执行,是用来限制某个任务的最早可执行时刻。在到达该时刻之前,该任务会被跳过。<br />可以利用该功能实现定时任务。

1. 在生产者业务代码中:
```php
// 即时执行
$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延迟 2 秒执行
$isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延迟到 2017-02-18 01:01:01 时刻执行
$time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');    
$isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
  1. 在消费者类中:

    // 重发,即时执行
    $job->release();
    // 重发,延迟 2 秒执行
    $job->release(2);
    // 延迟到 2017-02-18 01:01:01 时刻执行
    $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
    $job->release($time2wait);
    
  2. 在命令行中: ```php //如果消费者类的fire()方法抛出了异常且任务未被删除时,将自动重发该任务, //重发时,会设置其下次执行前延迟多少秒,默认为0

php think queue:work —delay 3

<a name="V5aBy"></a>
# 消息的重发
thinkphp-queue 中,消息的重发时机有3种:

<a name="bbreO"></a>
## 在消费者类中手动重发
```php
if( $isJobDone === false){
    $job->release();
}

work进程自动重发

需同时满足以下两个条件

  1. 消费者类的 fire() 方法抛出了异常
  2. 任务未被删除

当配置了 expire 不为 null 时,work 进程内部每次查询可用任务之前,会先自动重发已过期的任务

在database 模式下,前两种的重发逻辑是先删除原来的任务,然后插入一个新的任务。
work进程自动重发中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。
不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么

  1. 如果不需要自动重发的话, 请在抛出异常之前将任务删除 $job->delete() ,以免产生bug。
  2. 如果需要自动重发的话,请直接抛出异常,不要在 fire() 方法中又手动使用 $job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。

任务的失败回调及告警

当同时满足以下条件时,将触发任务失败回调:

  • 命令行的 —tries 参数的值大于0
  • 任务的已尝试次数大于 命令行的 —tries 参数
  • 开发者添加了 queue_failed 事件标签及其对应的回调代码
  • 消费者类中定义了 failed() 方法,用于接收任务失败的通知

注意, queue_failed 标签需要在安装了 thinkphp-queue 之后 手动 去 \application\tags.php 文件中添加。

注意:该版本有bug,若想实现失败任务回调功能,需要先修改位于 think-queue\src\queue\Worker.php 中的 logFailedJob方法 , 修改方式如下:

/**
 * Log a failed job into storage.
 * @param  \Think\Queue\Job $job
 * @return array
 */
protected function logFailedJob(Job $job)
{
    // 将原来的 queue.failed' 修改为 'queue_failed' 才可以触发任务失败回调
    if (Hook::listen('queue.failed', $job, null, true)) {  
        $job->delete();
        $job->failed();
    }

    return ['job' => $job, 'failed' => true];
}

首先,我们添加 queue_failed 事件标签, 及其对应的回调方法

// 文件路径: \application\tags.php
// 应用行为扩展定义文件
return [
    // 应用初始化
    'app_init'     => [],
    // 应用开始
    'app_begin'    => [],
    // 模块初始化
    'module_init'  => [],
    // 操作开始执行
    'action_begin' => [],
    // 视图内容过滤
    'view_filter'  => [],
    // 日志写入
    'log_write'    => [],
    // 应用结束
    'app_end'      => [],

    // 任务失败统一回调,有四种定义方式
    'queue_failed'=> [

         // 数组形式,[ 'ClassName' , 'methodName']
        ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']

         // 字符串(静态方法),'StaicClassName::methodName'
         // 'MyQueueFailedLogger::logAllFailedQueues'   

         // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
         // 'application\\behavior\\MyQueueFailedLogger'

         // 闭包形式
         /*
         function( &$jobObject , $extra){
             // var_dump($jobObject);
             return true;
         }
         */
    ]
];

这里,我们选择数组形式的回调方式,新增 \application\behavior\MyQueueFailedLogger 类,添加一个 logAllFailedQueues() 方法

<?php
/**
 * 文件路径: \application\behavior\MyQueueFailedLogger.php
 * 这是一个行为类,用于处理所有的消息队列中的任务失败回调
 */

namespace application\behavior;


class MyQueueFailedLogger {

    const should_run_hook_callback = true;

    /**
     * @param $jobObject   \think\queue\Job   //任务对象,保存了该任务的执行情况和业务数据
     * @return bool     true                  //是否需要删除任务并触发其failed() 方法
     */
    public function logAllFailedQueues(&$jobObject){

        $failedJobLog = [
            'jobHandlerClassName'   => $jobObject->getName(), // 'application\index\job\Hello'
            'queueName' => $jobObject->getQueue(),               // 'helloJobQueue'     
            'jobData'   => $jobObject->getRawBody()['data'],  // '{'a': 1 }'
            'attempts'  => $jobObject->attempts(),            // 3
        ];
        var_export(json_encode($failedJobLog,true));

           // $jobObject->release();     //重发任务
          //$jobObject->delete();         //删除任务
          //$jobObject->failed();      //通知消费者类任务执行失败

        return self::should_run_hook_callback;         
    }
}

需要注意该回调方法的返回值:

  • 返回 true 时,系统会自动删除该任务,并且自动调用消费者类中的 failed() 方法
  • 返回 false 时,系统不会自动删除该任务,也不会自动调用消费者类中的 failed() 方法,需要开发者另行处理失败任务的删除和通知。

最后,在消费者类中,添加 failed() 方法

/**
 * 文件路径: \application\index\job\HelloJob.php
 */

/**
 * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
 * @param $jobData  string|array|...      //发布任务时传递的 jobData 数据
 */
public function failed($jobData){
    send_mail_to_somebody() ; 

    print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; 
}

参考文档

thinkphp-queue自带的队列包使用分析