消息服务
- 所谓消息服务,就是两个进程之间,通过消息服务器传递消息(本质是TCP连接通信)
- 一个web服务即是一个进程,占用一个端口
- producer即生产消息的一方,consumer即处理消息的一方
- 使用消息服务,而不是直接调用对方的API,它的好处是:
- 双方各自无需知晓对方的存在,消息可以异步处理,因为消息服务器会在使用者离线的时候自动缓存消息; 即发消息这个跟我请求无关,我请求能更快返回,而不必等待消息发完再返回
- 如果Producer发送的消息频率高于Consumer的处理能力,消息可以积压在消息服务器,不至于压垮Consumer;
- 通过一个消息服务器,可以连接多个Producer和多个Consumer。
- 优点概况:异步、解耦、削峰
-
关系区别
JMS:消息服务接口
- ActiveMQ:jsm实现。一般讲
**Artemis**
时即指**ActiveMQ**
**Rocket**
基于jms
- ActiveMQ:jsm实现。一般讲
- AMQP:高级消息队列协议 接口比协议还要细分,一个接口可能兼容多种协议,但是协议都是1对1
- RabbitMQ:amqp实现
- Kafka:自己就是协议也是实现
- 还有一个是redis也有队列的功能,但是redis的队列如果没有被消费,会直接消失在内存中。而消息服务一般会一直存储于队列里面不会丢失,可能成为实行对象
|
| JMS | AMQP | | —- | —- | —- | | 定义 | Java API | 协议 | | 跨语言 | 否 | 是 | | 跨平台 | 否 | 是 | | 支持消息类型 | 提供两种消息模型: Peer-2-Peer和Pub/Sub | 提供了五种消息模型:direct exchange、fanout exchange、topic change、headers exchange、system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分; | | 支持消息类型 | 支持多种消息类型:StreamMessage(Java原始值的数据流)、MapMessage(一套名称-值对)、TextMessage(一个字符串对象)、ObjectMessage(一个序列化的 Java对象)、BytesMessage(一个字节的数据流)、Message (只有消息头和属性) | byte[ ](二进制) |
吞吐量
activemq 万级 rabbitmq 万级 rocketmq
消息积压问题
如果消息一直没有被消费,那么他会一直积压在队列里面,直到队列装不下为止。
消息积压过多的处理:
AMQP是一种使用广泛的独立于语言的消息协议,它的全称是Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。
- 实际上,Artemis也支持AMQP,但实际应用最广泛的AMQP服务器是使用Erlang编写的RabbitMQ。
- 跨平台是对比jms最大的优势,这样可以不同语言服务器之间通信
- RabbitMQ是AMQP的默认实现
RabbitMQ 采用类似 NIO(Non-block I/O)的做法,选择 TCP 连接复用,不仅可以减少了建立和销毁TCP连接的性能开销,同时也便于管理。
- NIO,也称非阻塞 I/O,包括三大核心部分:Channel(信道)、Buffer(缓存区)和 Selector(选择器)。NIO 基于 Channel 和 Buffer 进行操作,数据总是从信道读取数据到缓冲区,或者从缓冲区中写入信道中。Selector 用于监听多个信道的时间(比如链接打开,数据到达等)。因此,单线程可以监听多个数据的信道。
- 每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以有效地节省 TCP 连接资源。
消息模型
AMQP协议比JMS要复杂一点,它消息队列只有Queue,没有Topic,并且引入了Exchange的概念。
- 当Producer想要发送消息的时候,它将消息发送给Exchange,由Exchange将消息根据各种规则投递到一个或多个Queue
- 如果某个Exchange总是把消息发送到固定的Queue,那么这个消息通道就相当于JMS的Queue。如果某个Exchange把消息发送到多个Queue,那么这个消息通道就相当于JMS的Topic。
- 和JMS的Topic相比,Exchange的投递规则更灵活,比如一个“登录成功”的消息被投递到Queue-1和Queue-2,而“登录失败”的消息则被投递到Queue-3。这些路由规则称之为Binding(Binding即要投递的queue的名称),通常都在RabbitMQ的管理后台设置。
-
Exchange
Exchange即交换机,可以看成是一个路由器,根据Binding对消息进行转发给指定的队列
- Exchange默认就存在一个交换机
Default Exchange
,它是一个没有名字,类型为Direct
的交换机- 在 RabbitMQ 中,每个队列都要和一个交换机有绑定关系,每一个发给RabbitMq的消息不指名exchange时,默认都为发给了Direct
- Exchange的类型有4种,默认的类型为
Direct
Direct
:其类型的行为是 先匹配、再投送,即在绑定时设定一个routing_key
,设置了**routing_key**
的消息会走**routing_key**
一致的**exchange**
,才会被交换器投送到所绑定的队列(即Binding)中去。这种交换类型会采用轮询的方式进行投递消息Topic
:按规则转发消息(最灵活)。com.mq.rabbit.error
比如创建了一个路由键,对于cn.mq.rabbit.*``#.error``cn.mq.#
可以接收到消息,而cn.mq.*
无法接收* 用于匹配一个分段(用 . 分割)的内容,# 用于匹配 0 和多个字符。
Headers
设置 header attribute 参数类型的交换机。Fanout
转发消息到所有绑定队列。
fanout和topic 都是广播形式的,因此无法获取历史消息,而 direct 可以。
Topic
topic类型的交换机相当于是
direct
类型的升级版:它允许使用通配符Fanout
Fanout就是消息广播,交换器不考虑
**routing_key**
,而是将它收到的所有消息发送给所有绑定的消息队列Headers
是早期的一种交换器,工作机制与另外三种完全不一样,而且性能最低,现在基本不再使用虚拟主机
虚拟主机(Virtual Host,在 RabbitMQ 中称作 vhost)是 AMQP 规范中的一个基本概念,客户端在连接消息服务器时必须指定一个虚拟主机。
- 一个RabbitMq里可以有多个虚拟主机,rabbitmq中默认存在一个名字与值为
**/**
,用户名和密码都为**guest**
的虚拟主机 - 虚拟主机本质上就是一个缩小版的 RabbitMQ 服务器,其内部有自己的队列、交换器、绑定等。
- 不同虚拟主机之间不互通,如把a虚拟主机的exchange绑定到b虚拟主机的队列上不允许
- 如果不同服务之间消息希望互不干扰,我们可以给他们分别使用一个虚拟主机
- 一个RabbitMq里可以有多个虚拟主机,rabbitmq中默认存在一个名字与值为
- rabbitMq提供了虚拟主机的管理工具
rabbitmqclt
```shell创建虚拟主机 test
rabbitmqctl add_vhost test
删除虚拟主机 test
rabbitmqctl delete_vhost test
查询当前 RabbitMQ 中所有的虚拟主机
rabbitmqctl list_vhosts
<a name="HSHx1"></a>
## 实现AMQP
<a name="EcnD6"></a>
### 创建队列
- `RabbitMQ`不会自动创建不存在队列,我们一般在管理页面后台创建。
- 创建队列与创建Exchange除了在RabbitMq控制台里创建,也可以通过创建bean的方式创建。见[bean方式创建队列与交换机等.pdf](https://www.yuque.com/attachments/yuque/0/2022/pdf/2319994/1652505366249-2cc4179f-d6d7-4aae-82c8-2cb5401f8f7d.pdf?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Fpdf%2F2319994%2F1652505366249-2cc4179f-d6d7-4aae-82c8-2cb5401f8f7d.pdf%22%2C%22name%22%3A%22bean%E6%96%B9%E5%BC%8F%E5%88%9B%E5%BB%BA%E9%98%9F%E5%88%97%E4%B8%8E%E4%BA%A4%E6%8D%A2%E6%9C%BA%E7%AD%89.pdf%22%2C%22size%22%3A200179%2C%22type%22%3A%22application%2Fpdf%22%2C%22ext%22%3A%22pdf%22%2C%22source%22%3A%22%22%2C%22status%22%3A%22done%22%2C%22mode%22%3A%22title%22%2C%22download%22%3Atrue%2C%22taskId%22%3A%22u5c74e56b-a273-405d-8372-0b8a53c2315%22%2C%22taskType%22%3A%22upload%22%2C%22id%22%3A%22u46619dd9%22%2C%22card%22%3A%22file%22%7D)
- 在后台管理的`Queues`nav下,点击添加一个队列
- 类型采用默认的classic
- `**Durability**`**可配置为持久化(Durable)和非持久化(Transient),当Consumer不在线时,持久化的Queue会暂存消息,非持久化的Queue会丢弃消息。持久化的消息即便在服务重启后也会存在**
<a name="IzHX0"></a>
### 创建Exchange
- 这里只演示direct
- topic无非是bending可以使用通配符,发送消息时可以使用灵活的routing-key进行匹配
- <br />
- 在Exchanges中创建一个`Direct`类型的`Exchange`
- 然后点击我们添加的exchange的name,然后进入详情页添加`binding`,binding即添加exchange绑定的队列
- ![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255408961-304ee774-e371-4fba-a17c-9bb9e7272af1.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=109&id=u6299fd36&margin=%5Bobject%20Object%5D&name=image.png&originHeight=268&originWidth=577&originalType=binary&ratio=1&rotation=0&showTitle=false&size=21501&status=done&style=none&taskId=u251f1319-3fc6-4044-9652-b7cd138a165&title=&width=234.66668701171875)![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255561744-aa287470-4470-4de6-9ec4-042ce2c289ac.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=115&id=u3d722f62&margin=%5Bobject%20Object%5D&name=image.png&originHeight=351&originWidth=325&originalType=binary&ratio=1&rotation=0&showTitle=false&size=13261&status=done&style=none&taskId=u2547d9d0-1c9c-468f-a163-8cc24f9b9c9&title=&width=106.66667175292969)![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255836040-aa310e54-f971-48f3-82e5-ead89d532365.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=102&id=u99633d8f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=493&originWidth=487&originalType=binary&ratio=1&rotation=0&showTitle=false&size=15170&status=done&style=none&taskId=u8734ac28-2648-400a-b7b3-205f011056c&title=&width=100.66667175292969)
- 添加binding时还可以添加`Routing Key`,即消息发送规则,**不添加时为exchange会把消息发给每一个绑定的队列**,即类似`jms`的`topic` **添加后发消息时如果消息携带了**`**Routing Key**`**,则会走指定规则的exchange**
<a name="HuDsY"></a>
### springboot使用RabbitMQ
- **生产者和消费者不能使用**`@Server`等延伸注解,必须使用`@Component`!
<a name="IycKY"></a>
#### 依赖
- 引入`amqp`即可,rabbitmq是
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
貌似需要设置启用
rabbitmq
:@EnableRabbit
不过实际使用中不使用该注解也没啥影响spring: rabbitmq: host: localhost port: 5672 username: guest password: guest #下面为可选: listener: #用于设置手动消息确认 simple: acknowledge-mode: manual direct: acknowledge-mode: manual publisher-confirm-type: CORRELATED #确认消息是否到达exchange publisher-returns: true #确认消息是否到达queue # 日志不一定要配,因为spring默认的日志配置就够用了 logging: level: root: INFO xxx.yyy.zzz: DEBUG pattern: console: "${CONSOLE_LOG_PATTERN:\ %clr(${LOG_LEVEL_PATTERN:%5p}) \ %clr([%15.15t]){faint} \ %clr(%-40.40logger{39}){cyan} \ %clr(:){faint} %m%n\ ${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}"
配置类
springboot中大部分都自动装配了默认配置,如
RabbitTemplate
- MessageConverter用于将Java对象转换为RabbitMQ的消息。默认情况下,Spring Boot使用SimpleMessageConverter,只能发送String和byte[]类型的消息,不太方便。使用Jackson2JsonMessageConverter,我们就可以发送JavaBean对象,由Spring Boot自动序列化为JSON并以文本消息传递。
- 貌似默认的就可以传递对象,不过对象需要实现
Serializable
```java import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- 貌似默认的就可以传递对象,不过对象需要实现
@Bean MessageConverter createMessageConverter() { return new Jackson2JsonMessageConverter(); }
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
/**无论消息是否推送成功,都强制调用回调函数 */
rabbitTemplate.setMandatory(true);
/** 当 Exchange 收到消息后,这里设置的回调方法会被触发执行 */
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// 这里的 correlationData 来源于 convertAndSend 方法。
log.debug("消息唯一标识 : {}", correlationData);
if (ack) {
log.debug("消息已发送至 RabbitMQ(的Exchange),修改 id 为 {} 的状态。", correlationData.getId());
/** 这里是修改消息的消息状态,correlationDate可以看成消息对象 messageMapper.changeStatus(Long.parseLong(correlationData.getId())); } else { log.debug(“消息未能发送到 Exchange。失败原因 {}”, cause); // … } });
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/** 该方法在 Queue 无法收到消息时被触发执行。Queue 能收到消息则不会执行。 */
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback 消息:{}", message);
log.info("ReturnCallback 回应码:{}", replyCode);
log.info("ReturnCallback 回应信息:{}", replyText);
log.info("ReturnCallback 交换机:{}", exchange);
log.info("ReturnCallback 路由键:{}", routingKey);
}
});
return rabbitTemplate;
}
<a name="o1cIW"></a>
#### 生产者
- `convertAndSend()`有很多重载方法,我们暂时只需知道一个方法`convertAndSend(exchange,routeKey,msg)`。实测中调用无需传exchange的convertAndSend()并没有发现任何效果
- `rabbitMQ`管理后台如果没有配置`routing key`则在java传`""/null`
- **直接指定一个Queue并投递消息也是可以的**,此时指定Routing Key为Queue的名称即可,因为RabbitMQ提供了一个default exchange用于根据Routing Key查找Queue并直接投递消息到指定的Queue。但是要实现一对多的投递就必须自己配置Exchange。
- 只能发给没有设置routing的队列,发给设置了的队列无效果
- **实际项目中,可以将将发送消息(和消息表)的功能由一个专门的微服务来处理,这个微服务是真正意义上的消息的生产者,它向 RabbitMQ 投递消息。**
```java
@Component
public class RabbitMsgService {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendRegistrationMessage(User msg) {
rabbitTemplate.convertAndSend("exa", "", msg); //第一个为routing key,会走指定routing key的路由
}
public void sendLoginMessage(User msg) {
String routingKey = msg.success ? "" : "logined";
rabbitTemplate.convertAndSend("login", routingKey, msg);
}
}
消费者
- 消费消息有2种形式:
**Push**
推送形式:消息服务器推送消息给消费者,消费者配置好后就能自动接收,push形式接收使用@RabbitListener
即可**Pull**
拉消息形式:消费者需要自己主动获取消息,使用receiveAndConvert()
手动调用
- 直接添加
@RabbitListener(queues =队列名称)
即可 - exchange在消息无
routing key
时会把消息发给绑定的所有无routing key
的队列 消息有routing时exchange只发给绑定的有该routing的队列
@Component public class RabbitMQListener { static final String QUEUE_MAIL = "txt"; static final String QUEUE_SMS = "txt2"; static final String QUEUE_APP = "logontxt"; @RabbitListener(queues = QUEUE_MAIL) public void onRegistrationMessageFromMailQueue(User message) throws Exception { logger.error("queue {} received registration message: {}", QUEUE_MAIL, message); } .... }
rabbitTemplate.receiveAndConvert("队列名称");
RabbitMq实现消息持久化
实现持久化需要满足4个条件:
首先明确一个概念:rabbitmq里消息发送成功指的是消息成功到达exchange,消息推送成功指的是消息成功到达
- 设置开启消息发送成功和消息推送成功配置,并自定义
**rabbitmqTemplate**
(详见配置类)
```properties- 不开启消息确认时,发送失败仅仅是打印一条错误信息。
用于确认消息是否发送成功刀exchange
spring.rabbitmq.publisher-confirm-type=CORRELATED #默认为none,CORRELATED表示支持异步回调方式确认信息状态
确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
<a name="ctGyZ"></a>
## 消费者确认与拒绝消息
- **默认情况下,RabbitMQ 启用的是消费端自动(auto)回复。即,当消费端收到消息,就会给 RabbitMQ Broker 作出回复,表示已收到。**
- **回复行为有三种:**
- **auto:自动回复**
- **none:不回复**
- **manual:消费者手动回复;**在消费者回复前,该消息在消费端未回复前在 RabbitMQ Brocker(理解为消息服务器) 上一直处于 **Unacked** (理解为未读)状态。如果消费者始终都不回复该消息,那么直到消费者与 RabbitMQ 断开连接之后,这条消息才会重新变为 `Ready `状态。
- 使用手动确认,需要进行配置,消息类也需要进行修改
```shell
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
@Component
public class Consumer2 {
@RabbitListener(queues = "queue-demo-1")
public void process(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
//方法里channel进行确认消息或者是拒绝消息
//basicAck的第一个参数为唯一标识id,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel(Channel 是比 Connection 更小的单位),RabbitMQ 通过 Channel 向消费者投递消息时,都会为该消息分配一个唯一性标识:delivery tag 。同一个 Channel 中的消息的 delivery tag 都是唯一且单调递增的。
//第二个参数为是否批量确认,当参数为 false 时,意味着确认单条消息,RabbitMQ 仅从消息队列中删除该消息;当参数为 true 时,意味着批量确认,RabbitMQ 会从消息队列中删除编号小于等于该消息的所有信息
//basicAck()的作用是确认收到消息
channel.basicAck(tag,false);
//或者不进行消息确认,直接拒绝消息。第一个参数也是唯一标识id,第二个参数表示是否需要重新发送该消息(貌似不会自动实现重发,仅仅是一个标识,提示的作用)注意只有仍旧在rabibitmq里的消息才能重发,否则只能靠自定义的消息表
channel.basicReject(tag, false);
//channel.basicNack(); 批量拒绝
}
}
Java配置消息模型
@Bean
public Exchange exchange() {
// 构造函数及其重载共有这几种参数
//name 字符串值,exchange 的名称。
//durable 布尔值,表示该 exchage 是否持久化。它决定了当 RabbitMQ 重启后,你是否还能「看到」重启前创建的 exchange 。
//autoDelete 布尔值,表示当该 exchange 没「人」(queue)用时,是否会被自动删除。
即,实现逻辑上的临时交换机。项目启动时连接到 RabbitMQ ,创建交换机;项目停止时断开连接RabbitMQ 自动删除交换机。
//durable 和 autoDelete 默认分别是 true 和 false
return new TopicExchange("test-exchange-1", true, false);
}
@Bean
public Queue queue() {
//name 字符串值,queue 的名称。
//durable 布尔值,表示该 queue 是否持久化。
它决定了当 RabbitMQ 重启后,你是否还能「看到」重启前创建的 queue 。
另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
//exclusive 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
//autoDelete 布尔值,表示当该 queue 没「人」(connection)用时,是否会被自动删除。
即,实现逻辑上的临时队列。项目启动时连接到 RabbitMQ ,创建队列;项目停止时断开连接,RabbitMQ 自动删除队列。
//durable、exclusive 和 autoDelete 默认为 true 、 false 和 false
return new Queue("test-queue-1", true, false, false);
}
@Bean
public Binding binding(Exchange exchange, Queue queue) {
return BindingBuilder
.bind(queue).to(exchange).with("*.orange.*");
}
延迟队列
- 延时队列又被称为死信队列。普通队列的处理流程为
生产者-exchange-队列-消费者
延时队列为生产者-原exchange-延时队列(持有一段时间)-另外一个交换机,即死信交换机-队列-消费者
- 延时队列的用途:
- 替代定时任务
- 异步执行(自动执行)某些代码
- 用于一些倒计时任务(如30分钟未支付的订单自动取消)
延时队列即普通队列增加三个特殊的设置
Dead letter exchange:x-dead-letter-exchange
:指定下一个交换机(死信交换机)Dead letter routing key:x-dead-letter-routing-key
延迟队列传消息给下一个交换机所使用的routing-key
Message TTL:x-message-ttl
延迟队列延时的时长,单位为毫秒配置延时队列
@Configuration @EnableRabbit public class RabbitMQConfig { public static final String first_exchange_name = "first-exchange"; public static final String second_exchange_name = "second-exchange"; public static final String first_routing_key = "first-routing-key"; public static final String second_routing_key = "second-routing-key"; public static final String first_binding = "first-binding"; public static final String second_binding = "second-binding"; public static final String dead_queue_name = "dead-queue"; public static final String real_queue_name = "real-queue"; @Bean("dead-queue") public Queue deadQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", second_exchange_name); // 指定时期后消息投递给哪个交换器。 args.put("x-dead-letter-routing-key", second_routing_key); // 指定到期后投递消息时以哪个路由键进行投递。 args.put("x-message-ttl", 5000); // 指定到期时间。5 秒 return new Queue(dead_queue_name, true, false, false, args); } @Bean("real-queue") public Queue realQueue() { return new Queue(real_queue_name, true, false, false); } /* 问题一:发出的消息凭什么会到死信队列。*/ @Bean(first_exchange_name) public DirectExchange firstExchange() { return new DirectExchange(first_exchange_name, true, false); } @Bean(first_binding) public Binding firstBinding(@Qualifier(dead_queue_name) Queue queue, @Qualifier(first_exchange_name) Exchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(first_routing_key) .noargs(); } /* 问题二:延迟队列凭什么会把消息再转给 real-queue 。*/ @Bean(second_exchange_name) public DirectExchange secondExchange() { return new DirectExchange(second_exchange_name, true, false); } @Bean(second_binding) public Binding secondBiding(@Qualifier(real_queue_name) Queue queue, @Qualifier(second_exchange_name) Exchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(second_routing_key) .noargs(); } }
消息一致性的方案
一致性即要确保消息到达了消费者,并确保消息被消费掉。我们只需要确保到达queue即可
- 建立一张通用的消息表,极简情况下,下面这2个字段可以没有
**retry_count**
用于配合定时任务实现消息重发,表示重新发送次数。- 可以设置默认值,重新发送成功就改状态,一直失败就一直重试,每一次重试就-1,直到重试次数耗尽
**version**
用于实现乐观锁。如果存在多个生产者,可以配合乐观锁避免消息发送多次
- 生产者做完自身业务后要向消息表添加一条记录,表示有一条待发送消息。(注意添加消息记录与生产者自身业务要处于一个事务中
@Transacational
) - 配置好回调的配置与
rabitmqTemplate
- 如果消息发送成功,则在回调里修改消息表该条记录的状态。
retry_count
字段配合定义任务,不断从消息表中取出drop table if exists `message`; create table `message` ( `id` BIGINT AUTO_INCREMENT, `exchange` VARCHAR(128) NOT NULL, `routing_key` VARCHAR(128) NOT NULL, `message_content` VARCHAR(4096) NOT NULL, `status` VARCHAR(128) NOT NULL, #可能是消息是否发送成功的状态 默认状态可有可无,有成功与失败2种即可。个人觉得还可以设置一种消息未被消费状态 `retry_count` INT NOT NULL DEFAULT 3, `version` BIGINT NOT NULL default 0, PRIMARY KEY (`id`) ) ENGINE = InnoDB;
消息自动过期
未被消费的消息会堆积在 RabbitMQ 中。因此需要在创建队列,或发送消息时指定过期时间,以便于让 RabbitMQ 将这些在规定时间内未能消费的消息移除。
@Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10 * 60 * 1000); // 10 分钟的过期时间 return new Queue("user.register", true, false, false, arguments); }
-
JMS
即
Java Message Service
是JavaEE的消息服务接口。JMS主要有两个版本:1.1和2.0。2.0和1.1相比,主要是简化了收发消息的代码。JMS是一组接口定义,如果我们要使用JMS,还需要选择一个具体的JMS产品,即JMS服务器
一种是Queue
- Queue是一种一对一的通道(也可以多个consumre接一个queue),如果Consumer离线无法处理消息时,Queue会把消息存起来,等Consumer再次连接的时候发给它。
- 设定了持久化机制的Queue不会丢失消息。如果有多个Consumer接入同一个Queue,那么它们等效于以集群方式处理消息(即作为一个整体得到一份完整消息),即每个Consumer得到的消息加起来才是一份完整消息
- 另外一种是Topic
- Topic则是一种一对多通道。一个Producer发出的消息,会被多个Consumer同时收到,即每个Consumer都会收到一份完整的消息流。
- 离线时:取决于消息服务器对Topic类型消息的持久化机制。如果消息服务器不存储Topic消息,那么离线的Consumer会丢失部分离线时期的消息,如果消息服务器存储了Topic消息,那么离线的Consumer可以收到自上次离线时刻开始后产生的所有消息。
- JMS规范通过Consumer指定一个持久化订阅可以在上线后收取所有离线期间的消息,如果指定的是非持久化订阅,那么离线期间的消息会全部丢失。
- 如果一个Topic的消息全部都持久化了,并且只有一个Consumer,那么它和Queue其实是一样的。实际上,很多消息服务器内部都只有Topic类型的消息架构,Queue可以通过Topic“模拟”出来。
多个Producer也可以写入同一个Queue或者Topic,此时消息服务器内部会自动排序确保消息总是有序的。
实现原理
Producer和Consumer通常是通过TCP连接消息服务器,在编写JMS程序时,又会遇到ConnectionFactory、Connection、Session等概念,其实这和JDBC连接是类似的:
- ConnectionFactory:代表一个到消息服务器的连接池,类似JDBC的DataSource;
- Connection:代表一个到消息服务器的连接,类似JDBC的Connection;
- Session:代表一个经过认证后的连接会话;
- Message:代表一个消息对象。
JMS的消息类型支持以下几种:
以下消息以文本消息举例(即json)举例
生产者和消费者不能使用
@Server
等延伸注解,必须使用@Component
!依赖
```xml
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.2.19.RELEASE</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-client</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-handler-proxy</artifactId> <version>4.1.45.Final</version> </dependency>
<a name="Qkte2"></a>
### 配置类
- 配置类加上`@ComponentScan` `@EnableJms` _// 启用JMS_
- _以防万一,实测时没有这两个注解也没啥_
- url是`tcp://localhost:61616`的形式,前面的`tcp://`不要丢了,没有一般表示http连接
```java
@Value("${jms.uri}") //自己随意创建的外部配置
String uri;
@Value("${jms.username}")
String username;
@Value("${jms.password}")
String password;
//消息服务连接
@Bean
ConnectionFactory createJMSConnectionFactory() {
return new ActiveMQJMSConnectionFactory(uri, username, password);
}
//jms模板是spring提供的工具类,用于简化发消息
@Bean
JmsTemplate createJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
//侦听器
@Bean("jmsListenerContainerFactory")
DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
var factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
生产者
send的第一个参数为要发送的队列名称,名称是任意的,下面这种形式就更加规范些
- Artemis消息服务器默认配置下会自动创建Queue,因此不必手动创建一个名为jms/queue/mail的Queue,但不是所有的消息服务器都会自动创建Queue,生产环境的消息服务器通常会关闭自动创建功能,需要手动创建Queue
- 即我们如果发送给一个不存在的队列,那么消息服务器就会自动创建该队列
@Component public class MessagingService { @Autowired ObjectMapper objectMapper; @Autowired JmsTemplate jmsTemplate; public void sendMailMessage(User msg) throws JsonProcessingException { String text = objectMapper.writeValueAsString(msg); jmsTemplate.send("jms/queue/mail", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); } }
侦听器(消费者)
侦听器创建后会自动侦听指定消息队列,并执行方法体的消息处理
- 每一个进入队列的消息被侦听到就会执行
@JmsListener
方法
- 每一个进入队列的消息被侦听到就会执行
处理消息的核心代码是编写一个Bean,并在处理方法上标注@JmsListener
destination
消息来源的消息队列名称concurrency
表示可以最多同时并发处理?个消息,如果是5-10表示并发处理的线程可以在5~10之间调整。 ```java @Component @Slf4j public class MailMessageListener { @Autowired ObjectMapper objectMapper; //jackson用的@JmsListener(destination = “jms/queue/mail”, concurrency = “10”) public void onMailMessageReceived(Message message) throws throws JsonProcessingException, JMSException { if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText(); //json字符串 User user = objectMapper.readValue(text, User.class);
} else {
log.error("unable to process non-text message!");
} } }
<a name="ZI1WW"></a>
### 测试
- 直接将消息传给生产者即可,我们可以编写2个项目,一个只有消费者,一个只有生产者
- 生产者项目先关闭,执行消费者,再执行生产者,可以看到**能读取到离线消息,且消息未丢失**
- 生产者启动,再执行消费者,可以看到生产者**能实时读取到消息**
- 短时间给生产者传大量消息,能看到消费者的**消息顺序是依次读取的,是有序的**
<a name="AFRmx"></a>
## springboot使用JMS
- springboot实现jms和spring几乎一模一样,不同的地方就是:外部配置文件是spring规定的;自动完成了配置类;只需引入一个springboot的artemis即可
<a name="lr3Eg"></a>
### 依赖
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
配置
spring.artemis.mode=native #指定连接外部Artemis服务器,而不是启动嵌入式服务:
spring.artemis.host=127.0.0.1
spring.artemis.port=61616
spring.artemis.user=admin
spring.artemis.password=123456
Kafka
- Kafka也是一个消息服务器,它的特点一是快,二是有巨大的吞吐量,Kafka没有实现任何标准的消息接口,它自己提供的API就是Kafka的接口。
- Kafka本身是Scala编写的,运行在JVM之上。Producer和Consumer都通过Kafka的客户端使用网络来与之通信。从逻辑上讲,Kafka设计非常简单,它只有一种类似JMS的Topic的消息通道
- 但是Kafka可以进行分区。Kafka的一个Topic可以有一个至多个Partition,并且可以分布到多台机器上
- Kafka只保证在一个Partition内部,消息是有序的,但是,存在多个Partition的情况下(即多个group),Producer发送的3个消息会依次发送到Partition-1、Partition-2和Partition-3,Consumer从3个Partition接收的消息并不一定是Producer发送的顺序,因此,多个Partition只能保证接收消息大概率按发送时间有序,并不能保证完全按Producer发送的顺序。这一点在使用Kafka作为消息服务器时要特别注意,对发送顺序有严格要求的Topic只能有一个Partition。
- Kafka的另一个特点是消息发送和接收都尽量使用批处理,一次处理几十甚至上百条消息,比一次一条效率要高很多。
- kafka消息的持久性:消息保存多久取决于服务器的配置,可以按照时间删除(默认3天),也可以按照文件大小删除,因此,只要Consumer在离线期内的消息还没有被删除,再次上线仍然可以接收到完整的消息流。
- Consumer在离线期内的消息还没有被删除,再次上线仍然可以接收到完整的消息流。这一功能实际上是客户端自己实现的,客户端会存储它接收到的最后一个消息的offsetId,再次上线后按上次的offsetId查询。offsetId是Kafka标识某个Partion的每一条消息的递增整数,客户端通常将它存储在ZooKeeper中。
Springboot使用Kafka
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置
除了必须指定外,相关的配置项均为调优选项,即表示大概范围的配置。例如,表示一次最多抓取100条消息
spring: kafka: bootstrap-servers: localhost:9092 consumer: //这下面是消费者的配置 auto-offset-reset: latest max-poll-records: 100 max-partition-fetch-bytes: 1000000 #bootstrap-servers: 生产者消费者也可以配置不同的服务器
生产者
发送消息需指定Topic名称,消息正文。header名称为type,值为消息的类型,即class全名
Spring Boot自动为我们创建一个KafkaTemplate用于发送消息。注意到这是一个泛型类,而默认配置总是使用String作为Kafka消息的类型,所以注入KafkaTemplate
即可 @Component class KafkaMessagingService { @Autowired ObjectMapper objectMapper; //jackson序列化用 @Autowired KafkaTemplate<String, String> kafkaTemplate; public void sendRegistrationMessage(User msg) throws IOException { send("topic_registration", msg); } public void sendLoginMessage(User msg) throws IOException { send("topic_login", msg); } private void send(String topic, Object msg) throws IOException { ProducerRecord<String, String> pr = new ProducerRecord<>(topic, objectMapper.writeValueAsString(msg)); pr.headers().add("type", msg.getClass().getName().getBytes(StandardCharsets.UTF_8)); kafkaTemplate.send(pr); } }
消费者
一个topic可以有多个group(即分区,即上面提到的Partition),不同的group每个收到的都是完整的一份消息。如果存在多个相同group的监听器方法,那么这些方法作为一个整体收到一份消息
- @Payload表示传入的是消息正文,使用@Header可传入消息的指定Header
- Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是2,可以通过server.properties修改默认分区数量。
- 在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。和RabbitMQ相比,Kafka并不提供网页版管理后台,管理Topic需要使用命令行,比较繁琐,只有云服务商通常会提供更友好的管理后台。 ```java import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;
@Component public class KafkaTopicMessageListener { private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
ObjectMapper objectMapper;
@KafkaListener(topics = "topic_registration", groupId = "group1")
public void onRegistrationMessage(@Payload String message, @Header("type") String type) throws Exception {
User msg = objectMapper.readValue(message, getType(type));
logger.info("received registration message: {}", msg);
}
@KafkaListener(topics = "topic_login", groupId = "group1")
public void onLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
User msg = objectMapper.readValue(message, getType(type));
logger.error("received login message: {}", msg);
}
@KafkaListener(topics = "topic_login", groupId = "group2")
public void processLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
User msg = objectMapper.readValue(message, getType(type));
logger.error("process login message: {}", msg);
}
@SuppressWarnings("unchecked")
private static <T> Class<T> getType(String type) {
// TODO: use cache:
try {
return (Class<T>) Class.forName(type);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
<a name="N9yFE"></a>
# 各消息服务器的安装
<a name="tLt0L"></a>
## ActiveMQ
- 版本:`artemis 2.19.0`
<a name="kL4St"></a>
### 安装-配置-启动
- 官网下载时,有2种:`ActiveMQ Classic`或者`ActiveMQ Artemis`
- 前者基于`jms1.1`
- 后者支持2.0,1.x貌似也可以。
- 还使用基于Netty的异步IO,大大提升了性能。
- 此外,Artemis不仅提供了JMS接口,它还提供了AMQP接口,STOMP接口和物联网使用的MQTT接口。选择Artemis,相当于一鱼四吃。
- **Artemis一些较新的版本对于jdk有要求,如**`**2.20.0**`**要求**`**11+**`**。Classic不清楚**
- Artemis把程序和数据完全分离,所以我们解压后需要创建个数据文件夹
- 可以理解为解压安装包得到的是个总的程序目录,数据文件夹对应一个用户的服务的文件夹
---
1. 解压后配置环境变量`ARTEMIS_HOME`,指向安装目录。 然后path添加`%ARTEMIS_HOME%\bin`
1. 环境变量必须按照上面配置,artemis会寻找名为`ARTEMIS_HOME`的环境变量
2. 创建数据文件夹:cd到bin下,执行`artemis create 空文件夹路径` 先创建好一个空文件夹
1. 之后自动创建连接用户,依次输入用户名,密码,是否允许匿名登陆。 用户名我们设置为admin,匿名登录为N
3. 切换到数据文件夹的bin,执行`artemis run`启动Artemis服务
1. 启动成功后,Artemis提示可以通过URL`http://localhost:8161/console`访问管理后台。注意_不要关闭命令行窗口_
1. `61616`端口是artemis服务的端口(tcp连接端口) 8161是后台管理端口
<a name="gyOkK"></a>
## RabbitMQ
- 版本:`3.9.13`
<a name="fPytH"></a>
### 安装
1. RabbitMQ基于Erlang开发,所以需要首先拥有Erlang环境。下载Erlang的`exe`包安装即可
1. **要以管理员身份运行安装包!**
2. 之后再直接运行`rabbitmq-server-{version}.exe`包即可
2. 访问管理后台[http://localhost:15672](http://localhost:15672/),成功则没问题,没成功可能是没有启动rabbitmq服务,windwos下一般是默认安装后就自启动的
1. RabbitMQ后台管理的默认用户名和口令均为guest
1. 如果无法访问,但是服务已经启动,依次执行如下列操作
```powershell
进入rabbitmq的安装路径下,找到sbin目录,依次输入如下指令解决问题:
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop
如果上述命令出现:ERLANG_HOME not set correctly. 那么执行下面命令
set ERLANG_HOME=Erlang安装路径
Kafka
- 下载二进制文件(tgz) 版本:
2.12.3-3.1.0
- 解压后得到的bin里面还有一个windows,
bin
里是linux的程序,bin/windows
里是windows程序
- 解压后得到的bin里面还有一个windows,
kafka需要zookeeper环境,zookeeper是一个布式应用程序协调服务,为分布式应用提供一致性服务。
最好将kakfa解压至d盘,放c盘可能会报
命令过长
```powershell打开第一个窗口,启动zookeeper
cd kafka安装目录 bin\windows\zookeeper-server-start.bat config\zookeeper.properties
打开第二个窗口,启动kafka
cd kafka安装目录 bin\windows\kafka-server-start.bat config\server.properties
退出的话先后在kafka和zookeeper的窗口按:ctrl+c y退出
```