一、说明
PHP-RabbitMQ消息队列,支持单线程或多线程共同消费相同队列。
二、配置文件
在config目录下增加以amqp.php的配置文件,内容如下:
<?php
return [
'uri' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest'
],
'pools' => 50,
'enable' => true,
'threads' => true,
'consumers' => [
\Demo\App\Amqp\TestAmqp::class => [
'test01' => true,
],
\Demo\App\Amqp\TestAmqp2::class => [
'test01' => true,
],
],
];
参数说明
参数KEY名 | 描述说明 | 样例值 |
---|---|---|
uri | 声明队列的地址,包含地址、端口、用户名、密码 | |
pools | 连接池最大存放数量 | 默认50 |
enable | 是否启用AMQP | |
threads | 是否使用一个消费任务一个用户进程进行消费(多线程) | |
consumers | 定义启用的消费任务是否启用状态,不设置或设置为false表示不启用 |
三、客户端
向RabbitMQ服务器发送任务
第一步:声明队列客户端入口(注释的方式)
<?php
/**
* @Inject("send_sms")
* @var Amqp
*/
private $amqp;
注:1)Inject内的文本【send_sms】表示连接的队列KEY,队列KEY是必须。
2)多个队列使用多个Inject实例
第二步:发送消息内容
<?php
//发送内容
$this->amqp->basePublish('你好!这是短信验证码88888');
完成Class代码
<?php
namespace Demo\App;
use Swork\Bean\BeanCollector;
use Swork\Client\Amqp;
use Swork\Bean\Annotation\Inject;
class AmqpTestSender extends BeanCollector
{
/**
* @Inject("send_sms")
* @var Amqp
*/
private $amqp;
/**
* 操作发送内容
*/
public function send()
{
$this->amqp->basePublish('你好!这是短信验证码88888');
}
}
四、消费者端
通过关键词【QueueTask】支持在任务方法函数上实现消费者。
<?php
/**
* 消费者测试01
* @param AmqpArgument $argument
* @QueueTask("send_sms")
* @QueueTask("consumer_tag", "")
* @QueueTask("no_local", false)
* @QueueTask("no_ack", false)
* @QueueTask("exclusive", false)
* @QueueTask("nowait", false)
* @return bool
* @throws
*/
function test01(AmqpArgument $argument)
{
$channel = $argument->getChannel();
$message = $argument->getMessage();
$body = $argument->getBody();
//发送成功
//内部处理逻辑
//返回
return true;
}
注:1)当QueueTask的参数个数是:
a)1个参数时,是声明消费哪个队列的消息
b)2个参数时,是声明队列消费的参考(具体参数说明,请参考官方,默认为false或空)
2)方法的传入参数对象为AmqpArugment,$argument->getBody() 为消息原始内容
2)当no_ack为false时,消费者方法需要返回 true 或 false,进行确认消费
(服务器没有收到true认为没有确认)
参数KEY名 | 描述说明 | 样例值 |
---|---|---|
no_ack | 声明是否需要ack确认才确认消费掉 false:需要ack确认 true:不需要ack确认 |
默认false |
consumer_tag | - | 默认50 |
no_local | - | |
exclusive | - | |
nowait | - |
可简化版(把默认参数都去掉):
<?php
/**
* 消费者测试01
* @param AmqpArgument $argument
* @QueueTask("send_sms")
* @return bool
* @throws
*/
function test01(AmqpArgument $argument)
{
$channel = $argument->getChannel();
$message = $argument->getMessage();
$body = $argument->getBody();
//发送成功
//内部处理逻辑
//返回
return true;
}