fanout模型
fanout,也称为广播
在广播模式下,消息发送流程如下:
- 可以有多个消费者
- 每个消费者都有自己的队列(quene)
- 每个队列都要绑定到交换机(exchange)
- 生产者发送的消息,只能发送到交换机,由交换机决定发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到信息,实现一条消息被多个消费者消费
直接上代码测试:
//生产者
package com.alex.blog.rabbitmq.fanout;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//将通道声明指定的交换机,若不存在该交换机则会自己生成 参数1:交换机的名称 参数2:交换机的类型
channel.exchangeDeclare("logs","fanout");
//发送消息
channel.basicPublish("logs","",null,"hello,fanout".getBytes());
//关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
然后我们启动一下生产者,到管理页面查看是否生成了名为logs的交换机:
可以看到此处已经生产了一个名为logs、类型为fanout的交换机
下面编写消费者,此处创立三个消费者,代码内容一样,贴一个就好:
//消费者
package com.alex.blog.rabbitmq.fanout;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//交换机绑定队列(临时队列)
String queueName1 = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName1,"logs","");
//消费消息
channel.basicConsume(queueName1,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
然后我们先运行消费者1、2、3,再运行生产者(需注意,我们要先启动声明消费者,再启动声明生产者。否则先启动生产者的话,exchange接到消息后发现没有队列对它感兴趣,就任性的把消息给丢掉了,很容易理解,广播模式就好比你平时听收音机,只有你听的时候,才能收到收音机里的信息,你不听的时候,收不到信息),当然我我们创建的队列也都是临时队列,当消费者消费完以后,队列就自己删除了:
到后台查看:
消费者1:
消费者2:
消费者3:
由此我们可以看到,广播模式的重点在于,所有的消费者可以消费同一条消息