workerman/redis-queue
基于Redis的消息队列,支持消息延迟处理。
项目地址:
https://github.com/walkor/redis-queue
安装:
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/composer require workerman/redis-queue
示例
<?phpuse Workerman\Worker;use Workerman\Timer;use Workerman\RedisQueue\Client;require_once __DIR__ . '/vendor/autoload.php';$worker = new Worker();$worker->onWorkerStart = function () {$client = new Client('redis://127.0.0.1:6379');// 订阅$client->subscribe('user-1', function($data){echo "user-1\n";var_export($data);});// 订阅$client->subscribe('user-2', function($data){echo "user-2\n";var_export($data);});// 定时向队列发送消息Timer::add(1, function()use($client){$client->send('user-1', ['some', 'data']);});};Worker::runAll();
API
__construct (string $address, [array $options])
创建实例
$address类似redis://ip:6379,必须以redis开头.$options包括以下选项:auth: 鉴权信息,默认 ‘’db: db,默认 0max_attempts: 消费失败后重试次数,默认5retry_seconds: 重试时间间隔,单位秒。默认5
消费失败是指业务抛出异常
Exception或者Error。消费失败后消息会放到延迟队列等待重试,重试次数由max_attempts控制,重试间隔由retry_seconds和max_attempts共同控制。比如max_attempts为5,retry_seconds为10,第1次重试间隔为1*10秒,第2次重试时间间隔为2*10秒,第3次重试时间间隔为3*10秒,以此类推直到重试5次。如果超过了max_attempts设置测重试次数,则消息放入key为{redis-queue}-failed(1.0.5版本之前为redis-queue-failed)的失败队列
send(String $queue, Mixed $data, [int $dely=0])
向队列发送一条消息
$queue队列名,String类型$data发布的具体消息,可以是数组或者字符串,Mixed类型$dely延迟消费时间,单位秒,默认0,Int类型
subscribe(mixed $queue, callable $callback)
订阅一个队列或者多个队列
$queue队列名,可以是字符串或者包含多个队列名的数组$callback回调函数,格式为function (Mixed $data),其中$data就是send($queue, $data)中的$data.
unsubscribe(mixed $queue)
取消订阅
$queue队列名或者包含多个队列名的数组
在非workerman环境向队列发送消息
有时候一些项目运行在apache或者php-fpm环境,无法使用workerman/redis-queue项目,可以参考如下函数实现发送
function redis_queue_send($redis, $queue, $data, $delay = 0) {$queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前为redis-queue-waiting$queue_delay = '{redis-queue}-delayed';//1.0.5版本之前为redis-queue-delayed$now = time();$package_str = json_encode(['id' => rand(),'time' => $now,'delay' => 0,'attempts' => 0,'queue' => $queue,'data' => $data]);if ($delay) {return $redis->zAdd($queue_delay, $now + $delay, $package_str);}return $redis->lPush($queue_waiting.$queue, $package_str);}
其中,参数$redis为redis实例。例如redis扩展用法类似如下:
$redis = new Redis;$redis->connect('127.0.0.1', 6379);$queue = 'user-1';$data= ['some', 'data'];redis_queue_send($redis, $queue, $data);`
