一、说明

PHP-RabbitMQ消息队列,支持单线程或多线程共同消费相同队列。

二、配置文件

在config目录下增加以amqp.php的配置文件,内容如下:

  1. <?php
  2. return [
  3. 'uri' => [
  4. 'host' => '127.0.0.1',
  5. 'port' => 5672,
  6. 'user' => 'guest',
  7. 'password' => 'guest'
  8. ],
  9. 'pools' => 50,
  10. 'enable' => true,
  11. 'threads' => true,
  12. 'consumers' => [
  13. \Demo\App\Amqp\TestAmqp::class => [
  14. 'test01' => true,
  15. ],
  16. \Demo\App\Amqp\TestAmqp2::class => [
  17. 'test01' => true,
  18. ],
  19. ],
  20. ];

参数说明

参数KEY名 描述说明 样例值
uri 声明队列的地址,包含地址、端口、用户名、密码
pools 连接池最大存放数量 默认50
enable 是否启用AMQP
threads 是否使用一个消费任务一个用户进程进行消费(多线程)
consumers 定义启用的消费任务是否启用状态,不设置或设置为false表示不启用

三、客户端

向RabbitMQ服务器发送任务

第一步:声明队列客户端入口(注释的方式)

  1. <?php
  2. /**
  3. * @Inject("send_sms")
  4. * @var Amqp
  5. */
  6. private $amqp;

注:1)Inject内的文本【send_sms】表示连接的队列KEY,队列KEY是必须。
2)多个队列使用多个Inject实例

第二步:发送消息内容

  1. <?php
  2. //发送内容
  3. $this->amqp->basePublish('你好!这是短信验证码88888');

完成Class代码

  1. <?php
  2. namespace Demo\App;
  3. use Swork\Bean\BeanCollector;
  4. use Swork\Client\Amqp;
  5. use Swork\Bean\Annotation\Inject;
  6. class AmqpTestSender extends BeanCollector
  7. {
  8. /**
  9. * @Inject("send_sms")
  10. * @var Amqp
  11. */
  12. private $amqp;
  13. /**
  14. * 操作发送内容
  15. */
  16. public function send()
  17. {
  18. $this->amqp->basePublish('你好!这是短信验证码88888');
  19. }
  20. }

四、消费者端

通过关键词【QueueTask】支持在任务方法函数上实现消费者。

  1. <?php
  2. /**
  3. * 消费者测试01
  4. * @param AmqpArgument $argument
  5. * @QueueTask("send_sms")
  6. * @QueueTask("consumer_tag", "")
  7. * @QueueTask("no_local", false)
  8. * @QueueTask("no_ack", false)
  9. * @QueueTask("exclusive", false)
  10. * @QueueTask("nowait", false)
  11. * @return bool
  12. * @throws
  13. */
  14. function test01(AmqpArgument $argument)
  15. {
  16. $channel = $argument->getChannel();
  17. $message = $argument->getMessage();
  18. $body = $argument->getBody();
  19. //发送成功
  20. //内部处理逻辑
  21. //返回
  22. return true;
  23. }

注:1)当QueueTask的参数个数是:
a)1个参数时,是声明消费哪个队列的消息
b)2个参数时,是声明队列消费的参考(具体参数说明,请参考官方,默认为false或空)
2)方法的传入参数对象为AmqpArugment,$argument->getBody() 为消息原始内容
2)当no_ack为false时,消费者方法需要返回 true 或 false,进行确认消费
(服务器没有收到true认为没有确认)

参数KEY名 描述说明 样例值
no_ack 声明是否需要ack确认才确认消费掉
false:需要ack确认
true:不需要ack确认
默认false
consumer_tag - 默认50
no_local -
exclusive -
nowait -

可简化版(把默认参数都去掉):

  1. <?php
  2. /**
  3. * 消费者测试01
  4. * @param AmqpArgument $argument
  5. * @QueueTask("send_sms")
  6. * @return bool
  7. * @throws
  8. */
  9. function test01(AmqpArgument $argument)
  10. {
  11. $channel = $argument->getChannel();
  12. $message = $argument->getMessage();
  13. $body = $argument->getBody();
  14. //发送成功
  15. //内部处理逻辑
  16. //返回
  17. return true;
  18. }