workerman/redis-queue

基于Redis的消息队列,支持消息延迟处理。

项目地址:

https://github.com/walkor/redis-queue

安装:

  1. composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
  2. composer require workerman/redis-queue

示例

  1. <?php
  2. use Workerman\Worker;
  3. use Workerman\Timer;
  4. use Workerman\RedisQueue\Client;
  5. require_once __DIR__ . '/vendor/autoload.php';
  6. $worker = new Worker();
  7. $worker->onWorkerStart = function () {
  8. $client = new Client('redis://127.0.0.1:6379');
  9. // 订阅
  10. $client->subscribe('user-1', function($data){
  11. echo "user-1\n";
  12. var_export($data);
  13. });
  14. // 订阅
  15. $client->subscribe('user-2', function($data){
  16. echo "user-2\n";
  17. var_export($data);
  18. });
  19. // 定时向队列发送消息
  20. Timer::add(1, function()use($client){
  21. $client->send('user-1', ['some', 'data']);
  22. });
  23. };
  24. Worker::runAll();

API


__construct (string $address, [array $options])

创建实例

  • $address 类似 redis://ip:6379,必须以redis开头.

  • $options 包括以下选项:

    • auth: 鉴权信息,默认 ‘’
    • db: db,默认 0
    • max_attempts: 消费失败后重试次数,默认5
    • retry_seconds: 重试时间间隔,单位秒。默认5

消费失败是指业务抛出异常Exception或者Error。消费失败后消息会放到延迟队列等待重试,重试次数由 max_attempts控制,重试间隔由retry_secondsmax_attempts共同控制。比如max_attempts为5,retry_seconds为10,第1次重试间隔为1*10秒,第2次重试时间间隔为2*10秒,第3次重试时间间隔为3*10秒,以此类推直到重试5次。如果超过了max_attempts设置测重试次数,则消息放入key为{redis-queue}-failed(1.0.5版本之前为redis-queue-failed)的失败队列


send(String $queue, Mixed $data, [int $dely=0])

向队列发送一条消息

  • $queue 队列名, String 类型
  • $data 发布的具体消息,可以是数组或者字符串,Mixed 类型
  • $dely 延迟消费时间,单位秒,默认0, Int 类型

subscribe(mixed $queue, callable $callback)

订阅一个队列或者多个队列

  • $queue 队列名,可以是字符串或者包含多个队列名的数组
  • $callback 回调函数,格式为 function (Mixed $data),其中$data就是send($queue, $data)中的$data.

unsubscribe(mixed $queue)

取消订阅

  • $queue 队列名或者包含多个队列名的数组

在非workerman环境向队列发送消息

有时候一些项目运行在apache或者php-fpm环境,无法使用workerman/redis-queue项目,可以参考如下函数实现发送

  1. function redis_queue_send($redis, $queue, $data, $delay = 0) {
  2. $queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前为redis-queue-waiting
  3. $queue_delay = '{redis-queue}-delayed';//1.0.5版本之前为redis-queue-delayed
  4. $now = time();
  5. $package_str = json_encode([
  6. 'id' => rand(),
  7. 'time' => $now,
  8. 'delay' => 0,
  9. 'attempts' => 0,
  10. 'queue' => $queue,
  11. 'data' => $data
  12. ]);
  13. if ($delay) {
  14. return $redis->zAdd($queue_delay, $now + $delay, $package_str);
  15. }
  16. return $redis->lPush($queue_waiting.$queue, $package_str);
  17. }

其中,参数$redis为redis实例。例如redis扩展用法类似如下:

  1. $redis = new Redis;
  2. $redis->connect('127.0.0.1', 6379);
  3. $queue = 'user-1';
  4. $data= ['some', 'data'];
  5. redis_queue_send($redis, $queue, $data);
  6. `