基础
https://www.rabbitmq.com/tutorials/tutorial-one-php.html MQ官网
如果交换机,或者默认交换机推送数据,没有找到符合条件的队列,数据就会丢失,返回也是成功,就算是用了消息确认通道confirm,也是丢失数据
交换机的类型
direct:直连交换机
规则比较简单,在发布消息前,需要把交换机和队列进行一个绑定,如果发送消息的时候,路由键和绑定的值一致,消息就会投递到改队列中,如果不存在对应的队列,消息就会丢弃,
fanout:扇形交换机(广播,多个)
只要交换机和队列做了绑定,发布的消息都会到队列中去,(忽略路由键)
topic:主题交换机(广播,多个)
复杂一些,和直连类型相比,不是全等,类似于模糊搜索 sql 的like关键字
header:头交换机(多个)基本没用了
头交换机通过消息的headers属性和于交换机绑定的路由键进行匹配,将消息路由到一个或者多个路由列,当“x-match”设置为“any”时,消息头的任何一个值被匹配就可以满足条件,当设置为“all”时,就需要消息头的所有值都匹配成功
队列六种工作模式
work queue(工作队列模式)
工作队列模式,不需要要定义交换机,采用默认的交换机,路由名称为队列名称,由多个终端消费同一个队列的时候,交换机采用轮询发送消息,通俗点说就是给第一个发一条,另外一个发下一条
pub/sub(发布订阅)
1.每个消费者监听自己的队列,
2.生产者将消息发送给broker,由交换机将消息转发到绑定此路由的每个队列,每个绑定交换机的队列都将收到消息
流程
生产者:声明fanout类型的交换机,声明两个队列并且绑定到此交换机,绑定时不需要指定路由键,发送消息不需要指定路由键
消费者:交换机会将消息发布给每个监听本交换机的队列,但是如果多个消费者监听了同一个队列,这个队列还是会按照轮询的方式把信息发给每个消费者,一人一条(不同信息)
routing(路由模式)
1.每个消费者监听自己的队列,并且设置路由键
2.生产者将消息发送给交换机,由交换机根据路由键来转发到指定的队列
流程
生产者:声明direct类型的交换机,声明两个队列并且绑定到此交换机,绑定时需要指定路由键,发送消息需要指定路由键
消费者:消费者绑定队列的时候可以指定路由键来只获取指定的路由键的i消息
路由模式要求队列在绑定交换机时要指定路由键,消息会转发到符合路由键的队列中
topics(通配符模式主题模式)
同路由模式相似,但是路由键的匹配是通过通配符决定的,路由模式是相等才匹配,设置交换机类型为topics(主题模式)即可
通配符使用:
1.【#】,匹配一个或者多个词,比如uncle.#,可以匹配uncle.asna,uncle.emeer
2.【】,匹配一个词,比如uncle.,可以匹配uncle.asna,uncle.emeer
header(头交换机,基本不用)
与路由模式不同的地方在于,header模式取消了路由键,使用header中的key/val匹配队列
RPC(远程调用)
客户端远程调用服务端的方法,使用MQ可以实现EPC的异步调用,基于Direct交换机实现
流程:
1.客户端就是生产者就是消费者,向RPC请求队列发送RPC调用信息,同时监听相应队列,
2.服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果,
3.服务端将RPC方法的结果发送到RPC相应队列,
4,客户端(RPC调用方法)监听相应队列,接收RPC调用结果
你掉我,我掉你
交换机跟队列都设置好,路由键也绑定好了(路由键可以在消费者绑定也可以在生产者绑定),就剩下发送消息了
使用publish 方法,虽然可能因为使用的扩展不一样,不过当该参数还是那几个
msg: 消息对象
exchange :消息对象
routing_key:路由键
mandatory:消息至少有一个队列能够接受,如果交换机无法把消息发送到具体的队列中,是否要把消息发送到失败投递记录中,而不是让其消失
方法详解
申请交换机
/**
* 申请交换机,一旦申请,不能修改
*/
public function exchange_declare(
$exchange,
$type,
$passive = false,
$durable = false,
$auto_delete = true,
$internal = false,
$nowait = false,
$arguments = array(),
$ticket = null
)
参数:
exchange:交换机名称
type:交换机类型,fanout、direct、topic、headers,就是那些订阅了,直传,主体,模糊查询那些,一旦修改不能更换
passive:只判断不创建
durable:持久化
auto_delete:设置自动删除,前提是:至少有一个队列或者交换机与这个交换器绑定,之后所有与整个交换器绑定的队列或者交换器都与此解绑,
internal:内置的,true表示内置的交换器,客户端无法直接发送到这个交换器中,只能通过交换路由都交换器的方式
nowait:如果为true,表示不等服务器返回信息,直接返回null
arguments:其他一些结构化参数
ticket:不知道什么玩意
申请队列
/**
* 声明队列,如果不存在时,就会创建
*/
public function queue_declare(
$queue = '',
$passive = false,
$durable = false,
$exclusive = false,
$auto_delete = true,
$nowait = false,
$arguments = array(),
$ticket = null
)
参数:
queue :队列名称
passive:只判断,不创建(一般用于判断队列是否存在),当希望查询队列是否存在,又不想创建这个队列,设置为true,如果不存在,会抛出异常,存在返回null
durable:是否持久化,true,设置持久化,持久化可以保存在磁盘中,重启不会丢失
exclusive:设置是否排他,设置true时,表示为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除,注意三点:排他队列时基于连接可见的,同一个连接的不同通道是可以同时访问同一连接创建的排他队列;首次是指如果一个连接已经声明了一个排他队列其他连接不允许建立同名的排他队列;这个与普通队列不同,即使持久化也会在断开连接后,自动删除。这种队列适合一个客户端同时发送和读取消息的场景
auto_delete:是否自动删除,设置true表示队列自动删除,自动删除的前提:至少一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除,不要错误的理解,生产者断开不会删除,
nowait:如果为true,表示不等待服务器回执信息,函数将返回null,可以提高效率
arguments:数组,其他一些结构化参数,$arguments = new AMQPTable([ ‘x-message-ttl’ => 10000, // 延迟时间 (毫秒)创建queue时设置该参数可指定消息在该queue中待多久,可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。
‘x-expires’ => 26000, // 队列存活时间 如果一个队列开始没有设置存活时间,后面又设置是无效的。
‘x-dead-letter-exchange’ => ‘exchange_direct_ttl3’, // 延迟结束后指向交换机(死信收容交换机)
‘x-dead-letter-queue’ => ‘queue_ttl3’, // 延迟结束后指向队列(死信收容队列)
‘x-dead-letter-routing-key’ => ‘queue_ttl3’, // 设置routing-key
‘x-max-priority’=>‘10’ //声明优先级队列.表示队列应该支持的最大优先级。建议使用1到10之间.该参数会造成额外的CPU消耗。]);
ticket:null。不知道有什么用
队列与交换器绑定
/**
* 队列与交换器绑定
*/
public function queue_bind(
$queue,
$exchange,
$routing_key = '',
$nowait = false,
$arguments = array(),
$ticket = null
)
参数:
queue:队列名称
exchange:交换器名称
routing_key:用来绑定队列与交换器的路由键
nowait:是否等待回执信息
arguments:其他参数
ticket:null。未知
交换器与交换器绑定
/**
* 交换器与交换器绑定
*/
public function exchange_bind(
$destination,
$source,
$routing_key = '',
$nowait = false,
$arguments = array(),
$ticket = null
)
参数:
destination:目标交换器(某种程度上,可以看作一个队列)
source:源交换机(消息从source交换机到destination交换机)
routing_key:用来绑定队列和交换机的路由键
nowait:用来绑定队列和交换器的路由键
arguments:其他参数
ticket:null。未知
发送消息
/**
* 发送消息
*/
public function basic_publish(
$msg,
$exchange = '',
$routing_key = '',
$mandatory = false,
$immediate = false,
$ticket = null
)
参数:
mag:生产的消息(需要new成对象)
exchange:交换机的名称,指明消息需要发送到哪个交换机中,如果设置为空值,消息会被发送到默认的交换机中
routing_key:路由键,交换机根据路由键将消息存储到相应的队列中,简单队列时没有路由键,就是队列名
mandatory:设置为true时,交换机无法根据自身的类型和路由键找到符合条件的队列,那么RabbitMQ会调用Basic Return命令将消息返回给生产者,当设置为false时,上述情况下,消息会被丢弃,
immediate:3.0以上丢弃
ticket :null ,不知道什么玩意
消费消息
/**
* 消费消息
*/
public function basic_consume(
$queue = '',
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback = null,
$ticket = null,
$arguments = array()
)
参数:
queue:队列名称
consumer_tag:消费者标签,用来区分多个消费者
no_local:AMQP的标准,单RabbitMQ并没有做实现
no_ack:收到消息后,是否不需要回复确认即被认为被消费,设置为true表示自动应答,false,手动应答
exclusive:设置排他,排他消费者,即这个队列只能由一个消费者消费,适用于任务不允许进行并发处理的情况,
nowait:如果为true,表示不等待服务器回执信息,函数将返回null。但若排他开启的话,则必须需要等待结果的,如果两个一起开,就会报错
callback:callback函数,执行函数
ticket:null ,不知道是啥
arguments:一些额外的配置,数组
消息确认机制
分为两种,Confirm消息确认和ACK消息确认
Confirm消息确认
是生产者的消息确认,只要消息发送保存到队列中,队列就会有反馈,是成功,或者失败,
原理是,将通道设置为Confirm模式,之后所有在这个通通道上发送的消息,都会被指派上一个id从一开始,消息成功保存到匹配的队列后(如果持久化是保存到磁盘后),broker(队列)就会发送一个确认消息来给生产者,达到消息确认的目的
Confirm模式是支持异步的,意担发布一条消息,可以不用等待返回确认消息,直接继续执行下一条信息,当最终得到确认后,会通过回调的方式处理,有成功和失败两个方法,可以处理,
ACK(消息确认机制)
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给MQ,MQ收到反馈后才会将此消息从队列中删除
如果在消费者处理消息的时候出现异常,这条消息可能并没有完成处理,为了保证数据不会丢失,产生了ACK消息确认,如果没有发送ACK的话,消息不会删除,会重新放入队列中,等待重新消费
默认形况下是开启了自动确认ACK的,获取数据后,自动确认,删除队列数据,
我们也可以改为手动确认,在处理完业务代码后,手动发送ACK确认,删除队列数据
如果开启手动,但没有确认的话,有可能会出现死循环,会造成内存泄漏,此条消息会一直重新发送
为了防止出现死循环可以在程序中进行异常捕捉,或者使用ACK确认机制,开启重试,重试次数,默认三次,如果重试过后还没有确认,就会删除此消息
MQ有prefetch_count的概念,一个消费者同时最多可以处理几个消息,有的版本,默认是3,如果一个消费者一直不确认消息,MQ会继续推送,如果达到上线3个,就不会再给此消费者推送消息
可以设置一个消费者只处理一个消息
$channel->basic_qos( null , 1 , null );