消息队列介绍

使用场景

用户 小A 注册了商城的用户,需要温馨提示短信发送[比较耗时]给他注册的手机号上
用户 小A 在下单之后三十分钟内未付款,就需要自动取消此订单
用户 小A 付款后订单在收到货物后,未点击收货,就需要7天后自动确认收货
……

为什么使用

为了能够”异步化”比较耗时的操作,或者实现一些延期的操作。

短信发送比较耗时,如果前台注册的时候需要等待短信发送后才会继续操作的话,会影响到用户体验
而实现订单未付款自动取消,订单发货自动确认收货,是更好的体验

消息队列概念

yii的消息队列有生产者的概念和消费者的概念。

  • 生产者
    • 生成消息,比如要耗时了,将 [注册发送短信] 的 “消息” 发送到消息队列里
  • 消费者
    • 对消息进行操作,接收到 [注册发送短信] 的时候,根据 “消息” 主体内容,进行短信的发送
  • 队列
    • 存储消息的主体

与crontab区别

与 定时任务 (linux crontab) 的场景不同,crontab 常用于某个固定时间,比如每隔1分钟,每天中午,星期三的中午去执行某件事,而消息队列常用不定时的场景。

yii-queue使用

使用:https://github.com/yiisoft/yii2-queue/blob/master/docs/guide/README.md

队列原理

image.png

yii2-queue扩展

https://github.com/yiisoft/yii2-queue

yii-queue 是 yii 官方提供的扩展,提供了 redis、file、rabbitmq、mysql等多种存储方式。

  1. composer require --prefer-dist yiisoft/yii2-queue -vvv

其他扩展安装(任一)

基于redis的queue

https://github.com/yiisoft/yii2-redis

  1. composer require --prefer-dist yiisoft/yii2-redis:"~2.0.0" -vvv

基于rabbit的queue

https://github.com/php-enqueue/amqp-lib

  1. composer require enqueue/amqp-lib -vvv

queue配置(与上一个选项对应)

注:common/config/main.php (advance版),如果 basic 版,config/console.php 与 config/web.php 中都需要配置。

基于redis的配置

https://github.com/yiisoft/yii2-queue/blob/master/docs/guide/driver-redis.md

  1. <?php
  2. return [
  3. 'bootstrap' => [
  4. 'queue', // The component registers its own console commands
  5. ],
  6. 'components' => [
  7. 'redis' => [
  8. 'class' => \yii\redis\Connection::class,
  9. // ...
  10. // retry connecting after connection has timed out
  11. // yiisoft/yii2-redis >=2.0.7 is required for this.
  12. 'retries' => 1,
  13. ],
  14. 'queue' => [
  15. 'class' => \yii\queue\redis\Queue::class,
  16. 'redis' => 'redis', // Redis connection component or its config
  17. 'channel' => 'queue', // Queue channel key
  18. ],
  19. ];
  20. ];

基于rabbitmq的配置

注:扩展中有 RabbitMQ、AMQP Interop 其中 RabbitMQ 已过时 https://github.com/yiisoft/yii2-queue/blob/master/docs/guide/driver-amqp-interop.md


配置方式一**

  1. <?php
  2. return [
  3. 'bootstrap' => [
  4. 'queue', // The component registers its own console commands
  5. ],
  6. 'components' => [
  7. 'queue' => [
  8. 'class' => \yii\queue\amqp_interop\Queue::class,
  9. 'host' => '192.168.1.188',
  10. 'port' => 5672,
  11. 'user' => 'guest',
  12. 'password' => 'guest',
  13. 'queueName' => 'queue',
  14. 'driver' => \yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB
  15. ],
  16. ];
  17. ];

配置方式二

方式二:将 host、权限内容都组装到了 dsn 中

  1. <?php
  2. return [
  3. 'bootstrap' => [
  4. 'queue', // The component registers its own console commands
  5. ],
  6. 'components' => [
  7. 'queue' => [
  8. 'class' => \yii\queue\amqp_interop\Queue::class,
  9. 'queueName' => 'queue',
  10. 'driver' => \yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
  11. 'dsn' => 'amqp://guest:guest@192.168.1.188:5672'
  12. ],
  13. ];
  14. ];

queue的使用

新建job[消息主体]

新建service 目录,并在其下建立 queue/jobs 。

新建 service/queue/jobs/RegisterSmsJob.php

  1. <?php
  2. namespace service\queue\jobs;
  3. use yii\base\BaseObject;
  4. use yii\queue\Job;
  5. use common\models\User;
  6. class RegisterSmsJob extends BaseObject implements Job
  7. {
  8. // 此处的 $userID是自定义数据,根据需要自行定义
  9. public $userID;
  10. public function execute ($queue)
  11. {
  12. // 查询会员名称
  13. $user = User::findOne($this->userID);
  14. if(empty($user))
  15. {
  16. // 这里直接返回或者抛出异常
  17. // return 则是不予处理
  18. // 抛出异常的话可能会重试
  19. // return or throw new Exception()
  20. }
  21. // 用户名
  22. $username = $user->username;
  23. // 手机号
  24. $mobile= $user->mobile;
  25. // 此处的Sms自己封装的短信服务类
  26. Sms::send($mobile, "{$username}你好!欢迎注册某商城系统!");
  27. }
  28. }

生产消息

  1. <?php
  2. namespace frontend\controllers;
  3. use common\models\User;
  4. use yii\web\Controller;
  5. use Yii;
  6. /**
  7. * 登录与注册
  8. */
  9. class PassportController extends Controller{
  10. // 注册
  11. public function actionRegister(){
  12. // 伪代码
  13. $user = new User();
  14. $user->username = '张三';
  15. $user->mobile = 1888888888;
  16. $user->age = 12;
  17. $res = $user->insert();
  18. if (false === $res){
  19. throw new UserException('注册失败啦');
  20. }
  21. // 发送信息到消息队列,消息队列立即消费
  22. Yii::$app->queue->push(new RegisterSmsJob([
  23. 'userID' => $user->id
  24. ]));
  25. // 如果要实现短信 10分钟后发送
  26. // 通过 ->delay()方法,其中参数是延迟秒数,延迟多少自己计算就可以达到
  27. Yii::$app->queue->delay(10 * 60)->push(new RegisterSmsJob([
  28. 'userID' => $user->id
  29. ]));
  30. }
  31. }

消费消息

消息主体已定义,也使用 push 推到队列中,那么到底信息还没有被真正的执行到,所以短信还是没发出去

在根目录下执行命令,即可达到效果

  1. yii queue/listen # 此命令需要php配置了环境变量

此命令会一直 “卡着”,相当于一个死循环,才能在消息到达的时候监听到,并进行处理。