1、Routing之订阅模型-Direct
在广播模式(fanout)中,一条消息会被所有订阅的队列消费,但是某些场景我们希望不同的消息被不同的队列消费,这时候就需要Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不是任意绑定了,需要指定一个RoutingKey
消息的发送方向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,要根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,队列才会接受到消息
官网的流程图:
p:生产者
x:交换机(type为direct)
c1:消费者1,其所在队列指定了需要RoutingKey为error的消息
c2:消费者2,其所在队列指定了需要RoutingKey为info、error、warning的消息
上代码:
//生产者
package com.alex.blog.rabbitmq.direct;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//将通道声明指定的交换机,若不存在该交换机则会自己生成 参数1:交换机的名称 参数2:交换机的类型
channel.exchangeDeclare("logs_direct","direct");
//发送消息
channel.basicPublish("logs_direct","error",null,"hello,error".getBytes());
channel.basicPublish("logs_direct","info",null,"hello,info".getBytes());
channel.basicPublish("logs_direct","warning",null,"hello,warning".getBytes());
//关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1:
package com.alex.blog.rabbitmq.direct;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs_direct","direct");
//交换机绑定队列(临时队列)
String queueName1 = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName1,"logs_direct","error");
//消费消息
channel.basicConsume(queueName1,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
消费者2:
package com.alex.blog.rabbitmq.direct;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs_direct","direct");
//交换机绑定队列(临时队列)
String queueName2 = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName2,"logs_direct","info");
//消费消息
channel.basicConsume(queueName2,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
消费者3:
package com.alex.blog.rabbitmq.direct;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs_direct","direct");
//交换机绑定队列(临时队列)
String queueName3 = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName3,"logs_direct","warning");
//消费消息
channel.basicConsume(queueName3,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者3:"+new String(body));
}
});
}
}
然后我们按顺序启动消费者1、2、3,再启动生产者,后台页面如下:
消费者1:
消费者2:
消费者3:
可以看到,消息的发送是根据交换机的Routingkey与消息的RoutingKey来对应发送的,当然一个消费者也可以绑定多个RoutingKey,此处我们修改消费者3来演示,让消费者3也可以接受到RoutingKey为info的消息,新增代码:
//绑定交换机和队列
channel.queueBind(queueName3,"logs_direct","info");
结果如下:
2、Routing之订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型的Exchange可以让队列在绑定RoutingKey的时候使用通配符!这种RoutingKey一般都是由一个或者多个单词组成,多个单词之间用“.”分割。
支持的通配符:
# 匹配一个或多个词,比如 goods.add,goods.add.sub都能匹配到。
* 匹配一个,比如goods.delete能匹配到,goods.delete.haha就匹配不到。
上代码:
//生产者
package com.alex.blog.rabbitmq.topic;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//将通道声明指定的交换机,若不存在该交换机则会自己生成 参数1:交换机的名称 参数2:交换机的类型
channel.exchangeDeclare("logs_topic","topic");
String routingKey = "user.save.info";
//发送消息
channel.basicPublish("logs_topic",routingKey,null,("动态路由发送消息,RoutingKey为:"+routingKey).getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
这里,发送消息的RoutingKey为:user.save.info
消费者1、2、3、4其中的区别只有RoutingKey不同,所以只贴一份代码:
消费者:
package com.alex.blog.rabbitmq.topic;
import com.alex.blog.util.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs_topic","topic");
//交换机绑定队列(临时队列)
String queueName1 = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName1,"logs_topic","user.*.*");
//消费消息
channel.basicConsume(queueName1,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
消费者1的RoutingKey为:user.*.*
消费者2的RoutingKey为:*.save.*
消费者3的RoutingKey为:*.*.info
消费者4的RoutingKey为:#.info
结果为:
消费者1:
消费者2:
消费者3:
消费者4: