同步和异步通讯:
同步的优点:时效性高,可以立即获得操作结果
缺点:
1、耦合性高:当需要增加或修改功能时需要重新更改代码
2、性能低:必须等待所有的任务执行完毕,等待的过程占用了额外资源
3、可能出现级联失败的问题:当一个模块出问题的时候,整个请求都失败了
4、额外的资源消耗:必须等待每个服务的返回结果在返回响应,不能及时释放请求占用的资源。
异步的好处:
耦合性低:只需要维护某一个服务,服务插拔式,较灵活
性能较高:请求及时响应,无需等待每个服务的执行结果
调用无阻塞,不会造成过多的资源占用
流量削峰:只需要将消息交给broker,由broker去处理,每个服务只需要按自己的能力处理相应的请求。
缺点:
架构相对复杂多了;
强依赖于broker的安全性、可靠性和性能
异步通讯:
1、MQ是实现异步通讯的典型技术
2、为什么使用异步通信?异步、解偶、提高吞吐量、削峰
MQ:
就是消息队列,也就是事件中的broker。
为什么用rabbitMQ?
1、rabbitMQ是用Erlang语言写的,天生就是为了处理高并发的
2、rabbitMQ的延迟很低,是微秒级别的
3、他的消息可靠性高,一般不会出现消息丢失。
为什么用rocketMQ?
1、rocketMQ是用java写的,可以通过源码去实现一些私人订制
2、rocketMQ扛过了双十一,单机吞吐量高,性能强
3、消息可靠性高,一般不会出现消息丢失。
RabbitMQ实现springAmqp协议:
AMQP:是应用程序之间传递消息的标准,该协议与语言和平台无关,更符合微服务的独立性要求。
springAMQP:底层是springrabbit的默认实现。基于AMQP定义的一套API规范,用于提供消息的发送和接收模板。
提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
-
简单队列模型:
```java //yml配置文件 spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
//发送端 @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
//接收端 @Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
//也可以多几个接收端
。。。
}
发布/订阅<br />就是多了一个exchange(交换机)角色
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。

<a name="wrBBK"></a>
### Fanout
```java
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
//发送
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
//接收端
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct
基于注解
//发送
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
//接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
Topic
基于注解
//发送
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
//接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
三者对比:
fanout需要配置类
direct和topic都是基于注解,注解中类型默认direct,但是topic支持通配符,更加便捷。