一、说明
PHP-RabbitMQ消息队列,支持单线程或多线程共同消费相同队列。
二、配置文件
在config目录下增加以amqp.php的配置文件,内容如下:
<?phpreturn ['uri' => ['host' => '127.0.0.1','port' => 5672,'user' => 'guest','password' => 'guest'],'pools' => 50,'enable' => true,'threads' => true,'consumers' => [\Demo\App\Amqp\TestAmqp::class => ['test01' => true,],\Demo\App\Amqp\TestAmqp2::class => ['test01' => true,],],];
参数说明
| 参数KEY名 | 描述说明 | 样例值 |
|---|---|---|
| uri | 声明队列的地址,包含地址、端口、用户名、密码 | |
| pools | 连接池最大存放数量 | 默认50 |
| enable | 是否启用AMQP | |
| threads | 是否使用一个消费任务一个用户进程进行消费(多线程) | |
| consumers | 定义启用的消费任务是否启用状态,不设置或设置为false表示不启用 |
三、客户端
向RabbitMQ服务器发送任务
第一步:声明队列客户端入口(注释的方式)
<?php/*** @Inject("send_sms")* @var Amqp*/private $amqp;
注:1)Inject内的文本【send_sms】表示连接的队列KEY,队列KEY是必须。
2)多个队列使用多个Inject实例
第二步:发送消息内容
<?php//发送内容$this->amqp->basePublish('你好!这是短信验证码88888');
完成Class代码
<?phpnamespace Demo\App;use Swork\Bean\BeanCollector;use Swork\Client\Amqp;use Swork\Bean\Annotation\Inject;class AmqpTestSender extends BeanCollector{/*** @Inject("send_sms")* @var Amqp*/private $amqp;/*** 操作发送内容*/public function send(){$this->amqp->basePublish('你好!这是短信验证码88888');}}
四、消费者端
通过关键词【QueueTask】支持在任务方法函数上实现消费者。
<?php/*** 消费者测试01* @param AmqpArgument $argument* @QueueTask("send_sms")* @QueueTask("consumer_tag", "")* @QueueTask("no_local", false)* @QueueTask("no_ack", false)* @QueueTask("exclusive", false)* @QueueTask("nowait", false)* @return bool* @throws*/function test01(AmqpArgument $argument){$channel = $argument->getChannel();$message = $argument->getMessage();$body = $argument->getBody();//发送成功//内部处理逻辑//返回return true;}
注:1)当QueueTask的参数个数是:
a)1个参数时,是声明消费哪个队列的消息
b)2个参数时,是声明队列消费的参考(具体参数说明,请参考官方,默认为false或空)
2)方法的传入参数对象为AmqpArugment,$argument->getBody() 为消息原始内容
2)当no_ack为false时,消费者方法需要返回 true 或 false,进行确认消费
(服务器没有收到true认为没有确认)
| 参数KEY名 | 描述说明 | 样例值 |
|---|---|---|
| no_ack | 声明是否需要ack确认才确认消费掉 false:需要ack确认 true:不需要ack确认 |
默认false |
| consumer_tag | - | 默认50 |
| no_local | - | |
| exclusive | - | |
| nowait | - |
可简化版(把默认参数都去掉):
<?php/*** 消费者测试01* @param AmqpArgument $argument* @QueueTask("send_sms")* @return bool* @throws*/function test01(AmqpArgument $argument){$channel = $argument->getChannel();$message = $argument->getMessage();$body = $argument->getBody();//发送成功//内部处理逻辑//返回return true;}
