同步通讯和异步通讯特点
- 同步通讯:就像舔狗聊天,需要实时响应。
- 异步通讯:就像海王聊天,不需要马上回复。
同步通讯
特点
总结
- 同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
补充
在大并发(高并发)的环境下,如何优化服务
架构复杂了,业务没有明显的流程线,不好管理
-
MQ的技术选型
技术对比
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现: ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
常见MQ的对比
| | RabbitMQ | ActiveMQ | RocketMQ | Kafka | | —- | —- | —- | —- | —- | | 公司/社区 | Rabbit | Apache | 阿里 | Apache | | 开发语言 | Erlang | Java | Java | Scala&Java | | 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 | | 可用性 | 高 | 一般 | 高 | 高 | | 单机吞吐量 | 一般 | 差 | 高 | 非常高 | | 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 | | 消息可靠性 | 高 | 一般 | 高 | 一般 |
各自的优点
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
快速入门RabbitMQ
MQ的基本结构
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
安装RabbitMQ
在线拉取
docker pull rabbitmq:3.8-management
创建MQ容器
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=itcast142 \ -v mq-plugin:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
RabbitMQ服务结构体系
入门案例
导入Demo工程
包括三部分:
publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
-
publisher实现
思路:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
- 代码实现: ```java package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost(“192.168.94.128”); factory.setPort(5672); factory.setVirtualHost(“/“); factory.setUsername(“itcast”); factory.setPassword(“itcast142”); // 1.2.建立连接 Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
<a name="rW6JZ"></a>
#### consumer实现
- **代码思路:**
- **建立连接**
- **创建Channel**
- **声明队列**
- **订阅消息**
- **代码实现:**
```java
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.94.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("itcast142");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
总结
- 基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
AMQP和SpringAMQP说明和关系
AMQP
SpringAMQP
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp关系
AMQP实际上就是一个协议,制定了一个规则
- RabbitMQ对AMQP进行了实现
SpringAMQP实际上就是对AMQP进行了二次包装,简化开发过程,使得更好的进行使用
RabbitMQ队列模式
SpringAMQP如何实现各个队列模式
Basic Queue 简单队列模型
在父工程mq-demo中引入依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring: rabbitmq: host: 192.168.94.128 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: itcast142 # 密码
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送: ```java package cn.itcast.mq.helloworld;
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
/**
- @author: Mr.W
- @date: 2022/6/1
@description: 测试 */ @SpringBootTest public class SpringAmqpTest {
/ Rabbit设计模式(模板) / @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testBasicQueue() {
/** * 参数: * 1.队列名称 * 2.消息 */ rabbitTemplate.convertAndSend("simple.queue","hello, rabbitmq!");
} }

<a name="E7vxx"></a>
#### 消息接收
- **首先配置MQ地址,在consumer服务的application.yml中添加配置:**
```java
spring:
rabbitmq:
host: 192.168.94.128 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: itcast142 # 密码
- 然后在consumer服务的cn.itcast.mq.listener包中新建一个类SpringRabbitListener,代码如下: ```java package cn.itcast.mq.listener;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
/**
- @author: Mr.W
- @date: 2022/6/1
- @description:
*/
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {“simple.queue”})
public void testBasicQueue(String msg) {
} }System.out.println("spring 消费者接收到消息:【" + msg + "】");

<a name="Nt0rB"></a>
### WorkQueue
**Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。**<br /><br />**处理消息的方式:并不是同时接收,而是一人一次**
<a name="i7YIi"></a>
#### 测试案例说明
**在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue在consumer服务中定义两个消息监听者,都监听simple.queue队列消费者1每秒处理50条消息,消费者2每秒处理10条消息**
<a name="nEWX5"></a>
#### 消息发送
- **这次我们循环发送,模拟大量消息堆积现象。在publisher服务中的SpringAmqpTest类中添加一个测试方法:**
```java
package cn.itcast.mq.helloworld;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description: 测试
*/
@SpringBootTest
public class SpringAmqpTest {
/* Rabbit设计模式(模板) */
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testBasicQueue() {
/**
* 参数:
* 1.队列名称
* 2.消息
*/
rabbitTemplate.convertAndSend("simple.queue","hello, rabbitmq!");
}
/**
* WorkQueue的测试
*/
@Test
public void testBasicQueue1() throws InterruptedException {
for (int i = 1; i <= 50; i++) {
String msg = "工作队列模式,消息发送_";
/**
* 参数:
* 1.队列名称
* 2.消息
*/
rabbitTemplate.convertAndSend("simple.queue",msg+i);
//保证每秒50条
Thread.sleep(20);
}
}
}
消息接收
- 要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法: ```java package cn.itcast.mq.listener;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
/**
- @author: Mr.W
- @date: 2022/6/1
@description: */ @Slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = {“simple.queue”}) public void testBasicQueue(String msg) {
System.out.println("amqp消费者-1--接收到消息:【" + msg + "】");
}
@RabbitListener(queues = {“simple.queue”}) public void testBasicQueue1(String msg) {
System.err.println("amqp消费者-2--接收到消息:【" + msg + "】");
} }
<a name="Nig3e"></a>
#### 测试结果
```java
amqp消费者-1--接收到消息:【工作队列模式,消息发送_1】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_2】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_3】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_5】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_4】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_7】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_9】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_11】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_6】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_13】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_15】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_8】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_17】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_19】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_10】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_21】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_23】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_25】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_12】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_27】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_29】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_14】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_31】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_33】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_16】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_35】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_37】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_39】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_18】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_41】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_43】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_20】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_45】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_47】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_22】
amqp消费者-1--接收到消息:【工作队列模式,消息发送_49】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_24】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_26】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_28】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_30】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_32】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_34】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_36】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_38】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_40】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_42】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_44】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_46】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_48】
amqp消费者-2--接收到消息:【工作队列模式,消息发送_50】
- 可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
总结
Work模型的使用:
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
-
发布/订阅模式
Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力
Fanout
在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
-
实现步骤
创建一个交换机 itcast.fanout,类型是Fanout
- 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
具体实现
声明队列和交换机
- Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
- 在consumer中创建一个类,声明队列和交换机: ```java package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
/**
- @author: Mr.W
- @date: 2022/6/1
@description: */ @Configuration public class FanoutConfig {
/ 第一个队列 / @Bean public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
/ Fanout交换机 / @Bean public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
/**
- 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
- @param fanoutQueue1
- @param fanoutExchange
@return */ @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
/ 第二个队列 / @Bean public Queue fanoutQueue2() { return new Queue(“fanout.queue1”); }
/**
* 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
* @param fanoutQueue2
* @param fanoutExchange
* @return
*/
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
<a name="sGSjd"></a>
#### 消息发送
- **在publisher服务的SpringAmqpTest类中添加测试方法:**
```java
package cn.itcast.mq.helloworld;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description: 测试
*/
@SpringBootTest
public class SpringAmqpTest {
/**
* Fanout的测试
*/
@Test
public void testFanoutQueue() throws InterruptedException {
/**
* 参数:
* 1.队列名称
* 2.路由key:由于是fanout模式。所以要给""
* 3.消息
*/
rabbitTemplate.convertAndSend("itcast.fanout","","fanout 模式消息发送!");
}
}
消息接收
- 在consumer服务的SpringRabbitListener中添加两个方法,作为消费者: ```java package cn.itcast.mq.listener;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
/**
- @author: Mr.W
- @date: 2022/6/1
- @description:
*/
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {“fanout.queue1”})
public void testFanoutQueue1(String msg) throws InterruptedException {
} @RabbitListener(queues = {“fanout.queue2”}) public void testFanoutQueue2(String msg) throws InterruptedException {System.err.println("Fanout消费者-1--接收到消息:【" + msg + "】");
} }System.err.println("Fanout消费者-2--接收到消息:【" + msg + "】");

<a name="mCdK3"></a>
#### 总结
- **交换机的作用是什么?**
- **接收publisher发送的消息**
- **将消息按照规则路由到与之绑定的队列**
- **不能缓存消息,路由失败,消息丢失**
- **FanoutExchange的会将消息路由到每个绑定的队列**
- **声明队列、交换机、绑定关系的Bean是什么?**
- **Queue**
- **FanoutExchange**
- **Binding**
<a name="WcSkS"></a>
### Direct
**Direct可以对不同的消息被不同的队列消费,不同于Fanout一条消息给所有地方队列都消费**<br />
- **队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)**
- **消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。**
- **Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息**
<a name="ZsC49"></a>
#### 实践案例需求
- **利用@RabbitListener声明Exchange、Queue、RoutingKey**
- **在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2**
- **在publisher中编写测试方法,向itcast. direct发送消息**

<a name="J3pKC"></a>
#### 交换机和队列声明的模式
<a name="SkZUD"></a>
##### 配置类声明
```java
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description:
*/
@Configuration
public class DirectConfig {
/* 第一个队列 */
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}
/* Fanout交换机 */
@Bean
public DirectExchange directExchange() {
return new DirectExchange("itcast.direct");
}
/**
* 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
* @param directQueue1
* @param directExchange
* @return
*/
@Bean
public Binding directBinding1(Queue directQueue1,DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
}
注解声明
package cn.itcast.mq.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description:
*/
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(
bindings = {@QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
)}
)
public void testDirectQueue1(String msg) throws InterruptedException {
System.err.println("Fanout消费者-1--接收到消息:【" + msg + "】");
}
@RabbitListener(
bindings = {@QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
)}
)
public void testDirectQueue2(String msg) throws InterruptedException {
System.err.println("Fanout消费者-2--接收到消息:【" + msg + "】");
}
}
消息发送
- 在publisher服务的SpringAmqpTest类中添加测试方法 ```java package cn.itcast.mq.helloworld;
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
/**
- @author: Mr.W
- @date: 2022/6/1
- @description: 测试
/
@SpringBootTest
public class SpringAmqpTest {
/*
- Direct的测试
/
@Test
public void testDirectQueue() throws InterruptedException {
/*
- 参数:
- 1.队列名称
- 2.路由key:由于是fanout模式。所以要给””
- 3.消息 */ rabbitTemplate.convertAndSend(“itcast.direct”,”blue”,”direct 模式消息发送!”); } }
- Direct的测试
/
@Test
public void testDirectQueue() throws InterruptedException {
/*
<a name="TYy8u"></a>
#### 总结
- **描述下Direct交换机与Fanout交换机的差异?**
- **Fanout交换机将消息路由给每一个与之绑定的队列**
- **Direct交换机根据RoutingKey判断路由给哪个队列**
- **如果多个队列具有相同的RoutingKey,则与Fanout功能类似**
- **基于@RabbitListener注解声明队列和交换机有哪些常见注解?**
- **@Queue**
- **@Exchange**
<a name="HPUG3"></a>
### Topic
<a name="mFxMj"></a>
#### 说明
**Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!**<br />**Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: <br />**<br />**#:匹配一个或多个词**<br />***:匹配不多不少恰好1个词**<br />**举例:**<br />**item.#:能够匹配item.spu.insert 或者 item.spu**<br />**item.*:只能匹配item.spu**<br />
- **Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather**
- **Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news**
<a name="rfg23"></a>
#### 案例需求与思路

- **并利用@RabbitListener声明Exchange、Queue、RoutingKey**
- **在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2**
- **在publisher中编写测试方法,向itcast. topic发送消息**
<a name="jbM1c"></a>
#### 案例实现
配置
```java
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description:
*/
@Configuration
public class TopicConfig {
/* 第一个队列 */
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
/* Fanout交换机 */
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("itcast.topic");
}
/**
* 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
* @param topicQueue1
* @param topicExchange
* @return
*/
@Bean
public Binding topicBinding1(Queue topicQueue1,TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#");
}
/* 第二个队列 */
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}
/**
* 当前的bean都没有写名称,这时候就会以方法名作为该对象名称,所以这里自动找对应名称的对象
* @param topicQueue2
* @param topicExchange
* @return
*/
@Bean
public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news");
}
}
消息发送
- 在publisher服务的SpringAmqpTest类中添加测试方法 ```java package cn.itcast.mq.helloworld;
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
/**
- @author: Mr.W
- @date: 2022/6/1
- @description: 测试
/
@SpringBootTest
public class SpringAmqpTest {
/*
- topic的测试
/
@Test
public void testTopicQueue() throws InterruptedException {
/*
- 参数:
- 1.队列名称
- 2.路由key
- 3.消息 */ rabbitTemplate.convertAndSend(“itcast.direct”,”china.news”,”direct 模式消息发送!”); } }
- topic的测试
/
@Test
public void testTopicQueue() throws InterruptedException {
/*
<a name="YwD2f"></a>
##### 消息接收
- **在consumer服务的SpringRabbitListener中添加方法**
```java
package cn.itcast.mq.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: Mr.W
* @date: 2022/6/1
* @description:
*/
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = {"topic.queue1"})
public void testTopicQueue1(String msg) throws InterruptedException {
System.err.println("Topic消费者-1--接收到消息:【" + msg + "】");
}
@RabbitListener(queues = {"topic.queue2"})
public void testTopicQueue2(String msg) throws InterruptedException {
System.err.println("Topic消费者-2--接收到消息:【" + msg + "】");
}
}
总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- #:代表0个或多个词
-
消息转换器
SpringAMQP默认对Java转换
我们修改消息发送的代码,发送一个Map对象:
@Test public void testSendMap() throws InterruptedException { // 准备消息 Map<String,Object> msg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); // 发送消息 rabbitTemplate.convertAndSend("simple.queue","", msg); }
如何改变消息转换器——配置JSON转换器
导入依赖
在publisher和consumer两个服务中都引入依赖:
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
配置消息转换器
在启动类中添加一个Bean即可:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }