交换机
当消息提供者向RabbitMQ Server发布消息时,消息先过经过交换机,再根据不同的交换机以不同的规则,
按照RoutingKey规则绑定到队列,将消息放入队列中。
NoName Exchange (无名交换机):
如果我们使用""标识交换机,默认交换机。在无名交换机中RountingKey是可以指定队列名称的。
Direct Exchange(直连交换机):
将一个名为Q的消息队列与某个名为D的直连交换机通过值为R的路由键绑定在一起,当一个消息和路由键R发送
到直连交换机D上时,直连交换机D会把消息根据路由键R分发到Q队列,这种模式类似于一对一。
Fanout Exchange(扇型交换机):
当一个消息发送到扇形交换机F上时,则扇形交换机F会将消息分别发送给所有绑定到F上的消息队列。
扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,
故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。
Topic Exchange(主题交换机):
主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点,消息队列与主题交换机的绑定
也是通过路由键的。当一个Msg和路由键规则发送到一个主题交换机T时,T会根据路由键规则来筛选出符合
规则的绑定到自身消息队列的路由键(可能是1个,也可能是N个,也可能是0个),根据符合的路由键,
将消息发送到其对应的消息队列里。这个模式类似于多播,当消息的路由规则只匹配到一个路由键时,
此时主题交换机可以看作是直连交换机,当路由规则匹配了主题交换机上所有绑定的队列的路由键时,
此时主题交换机可以看作是扇形交换机。
消息模型
1.简单消息模型
2.工作队列模型
3.发布订阅模型
4.路由模型
5.主题模型
绑定
交换机和队列之间的联系,是通过RoutingKey路由规则绑定。
直接交换机
直接交换机一般使用路由模式绑定队列,指定路由规则选择性分发。
Consumer
//路由名称
private static final String EXCHANGE = "direct_routing";
public static void main(String[] args) {
//获取通道
Channel channel = RabbitMqUtils.getChannel();
try {
//声明交换机
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
//声明临时队列
String queue = channel.queueDeclare().getQueue();
//队列,交换机,路由Key
channel.queueBind(queue, EXCHANGE, "google");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费异常");
};
System.out.println("ConsumerA准备就绪");
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}
Provider
//路由名称
private static final String EXCHANGE = "direct_routing";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
try {
for (int i = 0; i < 10; i++) {
//发送消息
channel.basicPublish(EXCHANGE, "google", null, ("msg" + i).getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
RabbitMqUtils.close(connection, channel);
}
}
扇出交换机
Provider
//交换机名称
public static final String EXCHANGE = "my.fanout";
public static void main(String[] args) {
Channel channel = RabbitMqUtils.getChannel();
try {
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE, "", null, ("message" + i).getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
ConsumerA
//交换机名称
public static final String EXCHANGE = "my.fanout";
public static void main(String[] args) {
//获取队列
Channel channel = RabbitMqUtils.getChannel();
try {
//声明交换机(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
//声明一个临时队列(断开TCP连接即删除队列)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue, EXCHANGE, "");
System.out.println("ConsumerA准备就绪...");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费错误...");
};
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}
ConsumerB
//交换机名称
public static final String EXCHANGE = "my.fanout";
public static void main(String[] args) {
//获取队列
Channel channel = RabbitMqUtils.getChannel();
try {
//声明交换机(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
//声明一个临时队列(断开TCP连接即删除队列)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue, EXCHANGE, "");
System.out.println("ConsumerB准备就绪...");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费错误...");
};
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}
主题交换机
Topic交换机的路由需要满足一定的条件,必须是一个单词列表。#表示匹配0个或多个单词,*表示一个单词。
*.user.* 中间带user的三个单词组合
user.# 多个单词,第一个是user
Provider
//交换机名称
private static final String EXCHANGE = "my.topic";
public static void main(String[] args) {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
for (int i = 0; i < 20; i++) {
channel.basicPublish(EXCHANGE, "v.e.rabbit", null, ("msg" + i).getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
RabbitMqUtils.close(connection, channel);
}
}
ConsumerA
//交换机名称
private static final String EXCHANGE = "my.topic";
public static void main(String[] args) throws IOException {
//获取通道
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
//创建队列
channel.queueDeclare("Q1", false, false, false, null);
//交换机绑定队列
channel.queueBind("Q1", EXCHANGE, "*.orange.*");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("NodeA:" + new String(message.getBody()));
CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常");
channel.basicConsume("Q1", true, deliverCallback, cancelCallback);
}
ConsumerB
//交换机名称
private static final String EXCHANGE = "my.topic";
public static void main(String[] args) throws IOException {
//获取通道
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
//创建队列
channel.queueDeclare("Q2", false, false, false, null);
//交换机绑定队列
channel.queueBind("Q2", EXCHANGE, "*.*.rabbit");
channel.queueBind("Q2", EXCHANGE, "lazy.#");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message)
-> System.out.println("NodeB:" + new String(message.getBody()));
CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常");
channel.basicConsume("Q2", true, deliverCallback, cancelCallback);
}