总
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路 由给一个或多个绑定队列
Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。 通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
1、扇形交换机
思路解读(重点理解):
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都绑定到交换机上
(5)消息通过交换机到达每个消费者的队列
该模式就是FanoutExchange(扇型交换机)将消息路由给绑定到它身上的所有队列
以用户发邮件案例讲解
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
/ 生产者 交换机类型 producerfanout类型
public class FanoutProducer {
// 交换机名称
private static final String DESTINATION_NAME = "my_fanout_estination";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
channel.exchangeDeclare(DESTINATION_NAME, "fanout",true);
// 4.创建消息
String msg = "my_fanout_destination_msg";
System.out.println("生产者投递消息:" + msg);
// 5.发送消息 my_fanout_estination routingKey
channel.basicPublish(DESTINATION_NAME, "", null, msg.getBytes());
// 6.关闭通道 和连接
channel.close();
connection.close();
}
}
// 邮件消费者
public class ConsumerEmailFanout {
private static final String EMAIL_QUEUE = "email_queue_fanout";
// 交换机名称
private static final String DESTINATION_NAME = "my_fanout_estination";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费启动");
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费声明队列
channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
// 4.消费者队列绑定交换机
channel.queueBind(EMAIL_QUEUE, DESTINATION_NAME, "");
// 5.消费监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取生产消息:" + msg);
}
};
channel.basicConsume(EMAIL_QUEUE, true, defaultConsumer);
}
}
// 邮件消费者
public class ConsumerSmsFanout {
private static final String SMS_QUEUE = "sms_queue_fanout";
// 交换机名称
private static final String DESTINATION_NAME = "my_fanout_estination";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费声明队列
channel.queueDeclare(SMS_QUEUE, false, false, false, null);
// 4.消费者队列绑定交换机
channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "");
// 5.消费监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取生产消息:" + msg);
}
};
channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
}
}
2、直接交换机direct (routingKey)
交换机根据routingKey进行消息与消费者队列的匹配
// 生产者 交换机类型 producerfanout类型
public class RoutingProducer {
// 交换机名称
private static final String DESTINATION_NAME = "my_routing_estination";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
channel.exchangeDeclare(DESTINATION_NAME, "direct",true);
String routingKey = "yushengjun";
// 4.创建消息
String msg = "my_routing_destination_msg" + routingKey;
System.out.println("生产者投递消息:" + msg);
// 5.发送消息 my_fanout_estination routingKey
channel.basicPublish(DESTINATION_NAME, routingKey, null, msg.getBytes());
// 6.关闭通道 和连接
channel.close();
connection.close();
}
}
// 邮件消费者
public class ConsumerSmsRouting {
private static final String SMS_QUEUE = "sms_queue_routing";
// 交换机名称
private static final String DESTINATION_NAME = "my_routing_estination";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费声明队列
channel.queueDeclare(SMS_QUEUE, false, false, false, null);
// 4.消费者队列绑定交换机 绑定路由件 路由键 绑定邮件和短信
channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "email");
channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "sms");
// 5.消费监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取生产消息:" + msg);
}
};
channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
}
}
// 邮件消费者
public class ConsumerEmailRouting {
private static final String SMS_QUEUE = "eamil_queue_routing";
// 交换机名称
private static final String DESTINATION_NAME = "my_routing_estination";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
// 1. 建立mq连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费声明队列
channel.queueDeclare(SMS_QUEUE, false, false, false, null);
// 4.消费者队列绑定交换机 绑定路由件
channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "email");
// 5.消费监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取生产消息:" + msg);
}
};
channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
}
}
3、主题交换机
只是将生产者交换机有direct替换成topic,消费者叫routingKey替换成带或带#
符号#:匹配一个或者多个词lazy.#可以匹配lazy.irs或者lazy.irs.cor
**符号:只能匹配一个词lazy.可以匹配lazy.irs或者lazy.cor*