安装

  1. composer require topthink/think-queue

配置

驱动类型的设置

配置文件位于 config/queue.php

可选择 sync (默认) : 同步执行,database:数据库驱动 , redis:Redis驱动

推荐使用Redis驱动

// 队列配置
return [
    // 默认驱动
    'default'     => 'sync',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'queue_jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => true, // 数据持久化
        ],
    ],
    'failed'      => [
        'type'  => 'database',
        'table' => 'queue_failed_jobs',
    ],
];

队列执行设置

配置文件位于 app\queue\config\fire.php

参数 说明
retry_num 允许重试的次数
attempts_fail_mooe 执行失败后采取的操作 : close 直接关闭 release 重新进入队列
release_attempts_second 多少秒后重新进入队列
<?php
// +----------------------------------------------------------------------
// | Queue设置
// +----------------------------------------------------------------------
return [
    //重试的次数
    'retry_num' => 3,
    //执行失败后采取的操作
    'attempts_fail_mooe' => 'push',
    //多少秒后继续执行
    'release_attempts_second' => 60,
];

创建队列任务

参数

方法 : QueueService::joinQueue();
参数 说明
class 当前任务将由哪个类来负责处理(必须继承 QueueScript )
data 数组数据
execute_type 启动类型 push 立即执行 later 延迟执行
delay 延迟的秒数

案例

/**
  * 创建队列任务
  * @return Json
  */
function createDemo(): Json
{
  $execute_type = input('execute_type','push','trim');

  $QueueDemoModel = new QueueDemoModel();
  $QueueDemoModel->create_queue_demo_text = '我创建了任务拉';
  $QueueDemoModel->execute_type = $execute_type;
  $QueueDemoModel->create_time = time();
  $QueueDemoModel->update_time = time();
  $QueueDemoModel->save();

  //数组数据
  $data = [
    'queue_demo_id' => $QueueDemoModel->queue_demo_id,
  ];

  $res = QueueService::joinQueue(
    'app\queue\queuescript\DemoQueueScript',$data,
    $execute_type,60
  );
  return json($res);
}

队列执行回调

参数

run 任务执行的内容
checkExecute 检测该队列是否被使用过
参数 说明
job 队列信息
data 创建队列传的数组

案例

<?php
/**
 * Author: cycle_3
 */


declare(strict_types=1);

namespace app\queue\queuescript;

use app\queue\model\QueueDemoModel;
use think\queue\Job;
use think\Exception;

/**
 * demo执行回调
 */
class DemoQueueScript extends QueueScript
{

    /**
     * 任务执行的内容
     * @param Job $job
     * @param array $data
     * @return bool
     */
    public function run(Job $job, array $data): bool
    {
        try {
            $queue_demo_id = $data['queue_demo_id'] ?? 0;
            $QueueDemoModel = new QueueDemoModel();
            $QueueDemoDetails = $QueueDemoModel
                ->where('queue_demo_id', '=', $queue_demo_id)
                ->findOrEmpty();
            $QueueDemoDetails->execute_time = time();
            $QueueDemoDetails->execute_queue_demo_text = '我被执行成功了';
            $QueueDemoDetails->save();
            return true;
        } catch (Exception $e) {
            //异常
            return false;
        }
    }

    /**
     * 检测该队列是否被使用过
     * @param $data
     * @return bool
     */
    public function checkExecute(Job $job, array $data): bool
    {
        $queue_demo_id = $data['queue_demo_id'] ?? 0;
        $QueueDemoModel = new QueueDemoModel();
        $is_execute = $QueueDemoModel
            ->where('queue_demo_id', '=', $queue_demo_id)
            ->where('execute_time', '=', 0)
            ->value('queue_demo_id');
        if($is_execute) {
            return true;
        } else {
            return false;
        }
    }

}

启动


启动

php think queue:work --queue JobQueue

守护进程启动  

php think queue:work --queue JobQueue --daemon

Linux需要守护进程才可以,推荐使用Supervisor命令都是一样的

image.png

image.png

database 驱动说明

使用数据库驱动的情况会在数据库表 queue_jobs 中生成队列任务,安装的时候会默认创建表,没有默认创建的情况,可自行下载
image.png
ztb_queue_jobs.sql
ztb_queue_failed_jobs.sql

执行完后任务会自动被删除掉

redis 驱动说明

宝塔可直接安装
image.png

image.png