消息服务

  • 所谓消息服务,就是两个进程之间,通过消息服务器传递消息(本质是TCP连接通信)
    • 一个web服务即是一个进程,占用一个端口
    • image.png
      • producer即生产消息的一方,consumer即处理消息的一方
  • 使用消息服务,而不是直接调用对方的API,它的好处是:
    • 双方各自无需知晓对方的存在,消息可以异步处理,因为消息服务器会在使用者离线的时候自动缓存消息; 即发消息这个跟我请求无关,我请求能更快返回,而不必等待消息发完再返回
    • 如果Producer发送的消息频率高于Consumer的处理能力,消息可以积压在消息服务器,不至于压垮Consumer;
    • 通过一个消息服务器,可以连接多个Producer和多个Consumer。
  • 优点概况:异步、解耦、削峰
  • 常用场景:消息总线,消息处理,如短信消息,日志消息

    关系区别

  • JMS:消息服务接口

    • ActiveMQ:jsm实现。一般讲**Artemis**时即指**ActiveMQ**
    • **Rocket**基于jms
  • AMQP:高级消息队列协议 接口比协议还要细分,一个接口可能兼容多种协议,但是协议都是1对1
    • RabbitMQ:amqp实现
  • Kafka:自己就是协议也是实现
  • 还有一个是redis也有队列的功能,但是redis的队列如果没有被消费,会直接消失在内存中。而消息服务一般会一直存储于队列里面不会丢失,可能成为实行对象 |
    | JMS | AMQP | | —- | —- | —- | | 定义 | Java API | 协议 | | 跨语言 | 否 | 是 | | 跨平台 | 否 | 是 | | 支持消息类型 | 提供两种消息模型: Peer-2-Peer和Pub/Sub | 提供了五种消息模型:direct exchange、fanout exchange、topic change、headers exchange、system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分; | | 支持消息类型 | 支持多种消息类型:StreamMessage(Java原始值的数据流)、MapMessage(一套名称-值对)、TextMessage(一个字符串对象)、ObjectMessage(一个序列化的 Java对象)、BytesMessage(一个字节的数据流)、Message (只有消息头和属性) | byte[ ](二进制) |

吞吐量

  • activemq 万级 rabbitmq 万级 rocketmq

    消息积压问题

  • 如果消息一直没有被消费,那么他会一直积压在队列里面,直到队列装不下为止。

  • 消息积压过多的处理:

    • 如果消息有过期丢弃的政策,那就需要我们手动查询过期数据并处理。
    • 如果不会自动丢弃,我们需要立刻停止相关的消费者。然后创建一个临时的消费者该exchange仅仅是为了转发给更多的队列,然后在多个机器上部署消费者处理这些更多的队列,尽可能将全部消息处理完

      RabbitMQ

  • AMQP是一种使用广泛的独立于语言的消息协议,它的全称是Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。

    • 实际上,Artemis也支持AMQP,但实际应用最广泛的AMQP服务器是使用Erlang编写的RabbitMQ。
    • 跨平台是对比jms最大的优势,这样可以不同语言服务器之间通信
  • RabbitMQ是AMQP的默认实现
  • RabbitMQ 采用类似 NIO(Non-block I/O)的做法,选择 TCP 连接复用,不仅可以减少了建立和销毁TCP连接的性能开销,同时也便于管理。

    • NIO,也称非阻塞 I/O,包括三大核心部分:Channel(信道)、Buffer(缓存区)和 Selector(选择器)。NIO 基于 Channel 和 Buffer 进行操作,数据总是从信道读取数据到缓冲区,或者从缓冲区中写入信道中。Selector 用于监听多个信道的时间(比如链接打开,数据到达等)。因此,单线程可以监听多个数据的信道。
    • 每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以有效地节省 TCP 连接资源。

      消息模型

  • AMQP协议比JMS要复杂一点,它消息队列只有Queue,没有Topic,并且引入了Exchange的概念。

    • 当Producer想要发送消息的时候,它将消息发送给Exchange,由Exchange将消息根据各种规则投递到一个或多个Queue
    • 如果某个Exchange总是把消息发送到固定的Queue,那么这个消息通道就相当于JMS的Queue。如果某个Exchange把消息发送到多个Queue,那么这个消息通道就相当于JMS的Topic。
    • 和JMS的Topic相比,Exchange的投递规则更灵活,比如一个“登录成功”的消息被投递到Queue-1和Queue-2,而“登录失败”的消息则被投递到Queue-3。这些路由规则称之为Binding(Binding即要投递的queue的名称),通常都在RabbitMQ的管理后台设置。
  • image.png

    Exchange

  • Exchange即交换机,可以看成是一个路由器,根据Binding对消息进行转发给指定的队列

  • Exchange默认就存在一个交换机Default Exchange,它是一个没有名字,类型为Direct的交换机
    • 在 RabbitMQ 中,每个队列都要和一个交换机有绑定关系,每一个发给RabbitMq的消息不指名exchange时,默认都为发给了Direct
  • Exchange的类型有4种,默认的类型为Direct
    • Direct:其类型的行为是 先匹配、再投送,即在绑定时设定一个 routing_key设置了**routing_key**的消息会走**routing_key**一致的**exchange**,才会被交换器投送到所绑定的队列(即Binding)中去。这种交换类型会采用轮询的方式进行投递消息
    • Topic:按规则转发消息(最灵活)。com.mq.rabbit.error比如创建了一个路由键,对于cn.mq.rabbit.*``#.error``cn.mq.#可以接收到消息,而cn.mq.*无法接收
      • * 用于匹配一个分段(用 . 分割)的内容,# 用于匹配 0 和多个字符。
    • Headers 设置 header attribute 参数类型的交换机。
    • Fanout转发消息到所有绑定队列。
  • fanout和topic 都是广播形式的,因此无法获取历史消息,而 direct 可以。

    Topic

  • topic类型的交换机相当于是direct类型的升级版:它允许使用通配符

    Fanout

  • Fanout就是消息广播,交换器不考虑**routing_key**,而是将它收到的所有消息发送给所有绑定的消息队列

    • **Fanout**发送消息的速度是最快的

      Headers

  • Headers是早期的一种交换器,工作机制与另外三种完全不一样,而且性能最低,现在基本不再使用

    虚拟主机

  • 虚拟主机(Virtual Host,在 RabbitMQ 中称作 vhost)是 AMQP 规范中的一个基本概念,客户端在连接消息服务器时必须指定一个虚拟主机。

    • 一个RabbitMq里可以有多个虚拟主机,rabbitmq中默认存在一个名字与值为**/**,用户名和密码都为**guest**的虚拟主机
    • 虚拟主机本质上就是一个缩小版的 RabbitMQ 服务器,其内部有自己的队列、交换器、绑定等。
    • 不同虚拟主机之间不互通,如把a虚拟主机的exchange绑定到b虚拟主机的队列上不允许
      • 如果不同服务之间消息希望互不干扰,我们可以给他们分别使用一个虚拟主机
  • rabbitMq提供了虚拟主机的管理工具rabbitmqclt ```shell

    创建虚拟主机 test

    rabbitmqctl add_vhost test

删除虚拟主机 test

rabbitmqctl delete_vhost test

查询当前 RabbitMQ 中所有的虚拟主机

rabbitmqctl list_vhosts

  1. <a name="HSHx1"></a>
  2. ## 实现AMQP
  3. <a name="EcnD6"></a>
  4. ### 创建队列
  5. - `RabbitMQ`不会自动创建不存在队列,我们一般在管理页面后台创建。
  6. - 创建队列与创建Exchange除了在RabbitMq控制台里创建,也可以通过创建bean的方式创建。见[bean方式创建队列与交换机等.pdf](https://www.yuque.com/attachments/yuque/0/2022/pdf/2319994/1652505366249-2cc4179f-d6d7-4aae-82c8-2cb5401f8f7d.pdf?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Fpdf%2F2319994%2F1652505366249-2cc4179f-d6d7-4aae-82c8-2cb5401f8f7d.pdf%22%2C%22name%22%3A%22bean%E6%96%B9%E5%BC%8F%E5%88%9B%E5%BB%BA%E9%98%9F%E5%88%97%E4%B8%8E%E4%BA%A4%E6%8D%A2%E6%9C%BA%E7%AD%89.pdf%22%2C%22size%22%3A200179%2C%22type%22%3A%22application%2Fpdf%22%2C%22ext%22%3A%22pdf%22%2C%22source%22%3A%22%22%2C%22status%22%3A%22done%22%2C%22mode%22%3A%22title%22%2C%22download%22%3Atrue%2C%22taskId%22%3A%22u5c74e56b-a273-405d-8372-0b8a53c2315%22%2C%22taskType%22%3A%22upload%22%2C%22id%22%3A%22u46619dd9%22%2C%22card%22%3A%22file%22%7D)
  7. - 在后台管理的`Queues`nav下,点击添加一个队列
  8. - 类型采用默认的classic
  9. - `**Durability**`**可配置为持久化(Durable)和非持久化(Transient),当Consumer不在线时,持久化的Queue会暂存消息,非持久化的Queue会丢弃消息。持久化的消息即便在服务重启后也会存在**
  10. <a name="IzHX0"></a>
  11. ### 创建Exchange
  12. - 这里只演示direct
  13. - topic无非是bending可以使用通配符,发送消息时可以使用灵活的routing-key进行匹配
  14. - <br />
  15. - 在Exchanges中创建一个`Direct`类型的`Exchange`
  16. - 然后点击我们添加的exchange的name,然后进入详情页添加`binding`,binding即添加exchange绑定的队列
  17. - ![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255408961-304ee774-e371-4fba-a17c-9bb9e7272af1.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=109&id=u6299fd36&margin=%5Bobject%20Object%5D&name=image.png&originHeight=268&originWidth=577&originalType=binary&ratio=1&rotation=0&showTitle=false&size=21501&status=done&style=none&taskId=u251f1319-3fc6-4044-9652-b7cd138a165&title=&width=234.66668701171875)![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255561744-aa287470-4470-4de6-9ec4-042ce2c289ac.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=115&id=u3d722f62&margin=%5Bobject%20Object%5D&name=image.png&originHeight=351&originWidth=325&originalType=binary&ratio=1&rotation=0&showTitle=false&size=13261&status=done&style=none&taskId=u2547d9d0-1c9c-468f-a163-8cc24f9b9c9&title=&width=106.66667175292969)![image.png](https://cdn.nlark.com/yuque/0/2022/png/2319994/1643255836040-aa310e54-f971-48f3-82e5-ead89d532365.png#clientId=uc9a4896b-0603-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=102&id=u99633d8f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=493&originWidth=487&originalType=binary&ratio=1&rotation=0&showTitle=false&size=15170&status=done&style=none&taskId=u8734ac28-2648-400a-b7b3-205f011056c&title=&width=100.66667175292969)
  18. - 添加binding时还可以添加`Routing Key`,即消息发送规则,**不添加时为exchange会把消息发给每一个绑定的队列**,即类似`jms`的`topic` **添加后发消息时如果消息携带了**`**Routing Key**`**,则会走指定规则的exchange**
  19. <a name="HuDsY"></a>
  20. ### springboot使用RabbitMQ
  21. - **生产者和消费者不能使用**`@Server`等延伸注解,必须使用`@Component`!
  22. <a name="IycKY"></a>
  23. #### 依赖
  24. - 引入`amqp`即可,rabbitmq是
  25. ```xml
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-amqp</artifactId>
  29. </dependency>

配置

  • 貌似需要设置启用rabbitmq@EnableRabbit 不过实际使用中不使用该注解也没啥影响

    spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      #下面为可选:
       listener:  #用于设置手动消息确认
          simple:
            acknowledge-mode: manual
          direct:
            acknowledge-mode: manual
      publisher-confirm-type: CORRELATED     #确认消息是否到达exchange
      publisher-returns:  true                                 #确认消息是否到达queue
    # 日志不一定要配,因为spring默认的日志配置就够用了
    logging:
    level:
      root: INFO
      xxx.yyy.zzz: DEBUG
    pattern:
      console: "${CONSOLE_LOG_PATTERN:\
    %clr(${LOG_LEVEL_PATTERN:%5p}) \
    %clr([%15.15t]){faint} \
    %clr(%-40.40logger{39}){cyan} \
    %clr(:){faint} %m%n\
    ${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}"
    

    配置类

  • springboot中大部分都自动装配了默认配置,如RabbitTemplate

  • MessageConverter用于将Java对象转换为RabbitMQ的消息。默认情况下,Spring Boot使用SimpleMessageConverter,只能发送String和byte[]类型的消息,不太方便。使用Jackson2JsonMessageConverter,我们就可以发送JavaBean对象,由Spring Boot自动序列化为JSON并以文本消息传递。
    • 貌似默认的就可以传递对象,不过对象需要实现Serializable ```java import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.rabbit.connection.ConnectionFactory;

@Bean MessageConverter createMessageConverter() { return new Jackson2JsonMessageConverter(); }

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    /**无论消息是否推送成功,都强制调用回调函数 */
    rabbitTemplate.setMandatory(true);

    /**  当 Exchange 收到消息后,这里设置的回调方法会被触发执行   */
     rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        // 这里的 correlationData 来源于 convertAndSend 方法。
        log.debug("消息唯一标识 : {}", correlationData);
        if (ack) {
            log.debug("消息已发送至 RabbitMQ(的Exchange),修改 id 为 {} 的状态。", correlationData.getId());

/** 这里是修改消息的消息状态,correlationDate可以看成消息对象 messageMapper.changeStatus(Long.parseLong(correlationData.getId())); } else { log.debug(“消息未能发送到 Exchange。失败原因 {}”, cause); // … } });

    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        /** 该方法在 Queue 无法收到消息时被触发执行。Queue 能收到消息则不会执行。 */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("ReturnCallback 消息:{}", message);
            log.info("ReturnCallback 回应码:{}", replyCode);
            log.info("ReturnCallback 回应信息:{}", replyText);
            log.info("ReturnCallback 交换机:{}", exchange);
            log.info("ReturnCallback 路由键:{}", routingKey);
        }
    });
    return rabbitTemplate;
}
<a name="o1cIW"></a>
#### 生产者

- `convertAndSend()`有很多重载方法,我们暂时只需知道一个方法`convertAndSend(exchange,routeKey,msg)`。实测中调用无需传exchange的convertAndSend()并没有发现任何效果
- `rabbitMQ`管理后台如果没有配置`routing key`则在java传`""/null`
- **直接指定一个Queue并投递消息也是可以的**,此时指定Routing Key为Queue的名称即可,因为RabbitMQ提供了一个default exchange用于根据Routing Key查找Queue并直接投递消息到指定的Queue。但是要实现一对多的投递就必须自己配置Exchange。
   - 只能发给没有设置routing的队列,发给设置了的队列无效果
- **实际项目中,可以将将发送消息(和消息表)的功能由一个专门的微服务来处理,这个微服务是真正意义上的消息的生产者,它向 RabbitMQ 投递消息。**
```java
@Component
public class RabbitMsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendRegistrationMessage(User msg) {
        rabbitTemplate.convertAndSend("exa", "", msg); //第一个为routing key,会走指定routing key的路由
    }

    public void sendLoginMessage(User msg) {
        String routingKey = msg.success ? "" : "logined";
        rabbitTemplate.convertAndSend("login", routingKey, msg);
    }
}

消费者

  • 消费消息有2种形式:
    • **Push**推送形式:消息服务器推送消息给消费者,消费者配置好后就能自动接收,push形式接收使用@RabbitListener即可
    • **Pull** 拉消息形式:消费者需要自己主动获取消息,使用receiveAndConvert()手动调用
  • 直接添加@RabbitListener(queues =队列名称)即可
  • exchange在消息无routing key时会把消息发给绑定的所有无routing key的队列
  • 消息有routing时exchange只发给绑定的有该routing的队列

    @Component
    public class RabbitMQListener {
      static final String QUEUE_MAIL = "txt";
      static final String QUEUE_SMS = "txt2";
      static final String QUEUE_APP = "logontxt";
    
      @RabbitListener(queues = QUEUE_MAIL)
      public void onRegistrationMessageFromMailQueue(User message) throws Exception {
          logger.error("queue {} received registration message: {}", QUEUE_MAIL, message);
      }
      ....
    }
    
    rabbitTemplate.receiveAndConvert("队列名称");
    

    RabbitMq实现消息持久化

  • 实现持久化需要满足4个条件:

    • 投递消息的时候 durable 设置为 true ,消息持久化,Java配置时的代码:channel.queueDeclare(x, true, false, false, null),参数2 设置为 true 持久化;
    • 设置投递模式 deliveryMode 设置为 2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT TEXT PLAIN,x),参数3 设置为存储纯文本到磁盘;
    • 消息已经到达持久化交换器上;
    • 消息已经到达持久化的队列。

      生产者确认消息状态

  • 首先明确一个概念:rabbitmq里消息发送成功指的是消息成功到达exchange,消息推送成功指的是消息成功到达

  • 设置开启消息发送成功和消息推送成功配置,并自定义**rabbitmqTemplate**(详见配置类)
    - 不开启消息确认时,发送失败仅仅是打印一条错误信息。
    
    ```properties

    用于确认消息是否发送成功刀exchange

    spring.rabbitmq.publisher-confirm-type=CORRELATED #默认为none,CORRELATED表示支持异步回调方式确认信息状态

确认消息已发送到队列(Queue)

spring.rabbitmq.publisher-returns=true

<a name="ctGyZ"></a>
## 消费者确认与拒绝消息

- **默认情况下,RabbitMQ 启用的是消费端自动(auto)回复。即,当消费端收到消息,就会给 RabbitMQ Broker 作出回复,表示已收到。**
- **回复行为有三种:**
   - **auto:自动回复**
   - **none:不回复**
   - **manual:消费者手动回复;**在消费者回复前,该消息在消费端未回复前在 RabbitMQ Brocker(理解为消息服务器) 上一直处于 **Unacked** (理解为未读)状态。如果消费者始终都不回复该消息,那么直到消费者与 RabbitMQ 断开连接之后,这条消息才会重新变为 `Ready `状态。
      - 使用手动确认,需要进行配置,消息类也需要进行修改
```shell
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
@Component
public class Consumer2 {

    @RabbitListener(queues = "queue-demo-1")
    public void process(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        //方法里channel进行确认消息或者是拒绝消息

        //basicAck的第一个参数为唯一标识id,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel(Channel 是比 Connection 更小的单位),RabbitMQ 通过 Channel 向消费者投递消息时,都会为该消息分配一个唯一性标识:delivery tag 。同一个 Channel 中的消息的 delivery tag 都是唯一且单调递增的。
        //第二个参数为是否批量确认,当参数为 false 时,意味着确认单条消息,RabbitMQ 仅从消息队列中删除该消息;当参数为 true 时,意味着批量确认,RabbitMQ 会从消息队列中删除编号小于等于该消息的所有信息
        //basicAck()的作用是确认收到消息
        channel.basicAck(tag,false);

        //或者不进行消息确认,直接拒绝消息。第一个参数也是唯一标识id,第二个参数表示是否需要重新发送该消息(貌似不会自动实现重发,仅仅是一个标识,提示的作用)注意只有仍旧在rabibitmq里的消息才能重发,否则只能靠自定义的消息表
        channel.basicReject(tag, false);

        //channel.basicNack();  批量拒绝
    }
}

Java配置消息模型

@Bean
public Exchange exchange() {
    // 构造函数及其重载共有这几种参数
        //name    字符串值,exchange 的名称。
        //durable    布尔值,表示该 exchage 是否持久化。它决定了当 RabbitMQ 重启后,你是否还能「看到」重启前创建的 exchange 。
        //autoDelete    布尔值,表示当该 exchange 没「人」(queue)用时,是否会被自动删除。
即,实现逻辑上的临时交换机。项目启动时连接到 RabbitMQ ,创建交换机;项目停止时断开连接RabbitMQ 自动删除交换机。
    //durable 和 autoDelete 默认分别是 true 和 false
    return new TopicExchange("test-exchange-1", true, false);
}
@Bean
public Queue queue() {
    //name    字符串值,queue 的名称。
//durable    布尔值,表示该 queue 是否持久化。
它决定了当 RabbitMQ 重启后,你是否还能「看到」重启前创建的 queue 。
另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
//exclusive    布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
//autoDelete    布尔值,表示当该 queue 没「人」(connection)用时,是否会被自动删除。
即,实现逻辑上的临时队列。项目启动时连接到 RabbitMQ ,创建队列;项目停止时断开连接,RabbitMQ 自动删除队列。

    //durable、exclusive 和 autoDelete 默认为 true 、 false 和 false 
    return new Queue("test-queue-1", true, false, false);
}
@Bean
public Binding binding(Exchange exchange, Queue queue) {
    return BindingBuilder
        .bind(queue).to(exchange).with("*.orange.*");
}

延迟队列

  • 延时队列又被称为死信队列。普通队列的处理流程为生产者-exchange-队列-消费者 延时队列为生产者-原exchange-延时队列(持有一段时间)-另外一个交换机,即死信交换机-队列-消费者
  • 延时队列的用途:
    • 替代定时任务
    • 异步执行(自动执行)某些代码
    • 用于一些倒计时任务(如30分钟未支付的订单自动取消)
  • 延时队列即普通队列增加三个特殊的设置

    • Dead letter exchange:x-dead-letter-exchange:指定下一个交换机(死信交换机)
    • Dead letter routing key:x-dead-letter-routing-key 延迟队列传消息给下一个交换机所使用的routing-key
    • Message TTL:x-message-ttl 延迟队列延时的时长,单位为毫秒

      配置延时队列

      @Configuration
      @EnableRabbit
      public class RabbitMQConfig {
      
      public static final String first_exchange_name = "first-exchange";
      public static final String second_exchange_name = "second-exchange";
      
      public static final String first_routing_key = "first-routing-key";
      public static final String second_routing_key = "second-routing-key";
      
      public static final String first_binding = "first-binding";
      public static final String second_binding = "second-binding";
      
      public static final String dead_queue_name = "dead-queue";
      public static final String real_queue_name = "real-queue";
      
      @Bean("dead-queue")
      public Queue deadQueue() {
         Map<String, Object> args = new HashMap<>();
         args.put("x-dead-letter-exchange", second_exchange_name);   // 指定时期后消息投递给哪个交换器。
         args.put("x-dead-letter-routing-key", second_routing_key);  // 指定到期后投递消息时以哪个路由键进行投递。
         args.put("x-message-ttl", 5000);                            // 指定到期时间。5 秒
         return new Queue(dead_queue_name, true, false, false, args);
      }
      
      @Bean("real-queue")
      public Queue realQueue() {
         return new Queue(real_queue_name, true, false, false);
      }
      
      /* 问题一:发出的消息凭什么会到死信队列。*/
      @Bean(first_exchange_name)
      public DirectExchange firstExchange() {
         return new DirectExchange(first_exchange_name, true, false);
      }
      
      @Bean(first_binding)
      public Binding firstBinding(@Qualifier(dead_queue_name) Queue queue,
                                 @Qualifier(first_exchange_name) Exchange exchange) {
         return BindingBuilder.bind(queue)
                 .to(exchange)
                 .with(first_routing_key)
                 .noargs();
      }
      
      /* 问题二:延迟队列凭什么会把消息再转给 real-queue 。*/
      @Bean(second_exchange_name)
      public DirectExchange secondExchange() {
         return new DirectExchange(second_exchange_name, true, false);
      }
      
      @Bean(second_binding)
      public Binding secondBiding(@Qualifier(real_queue_name) Queue queue,
                                 @Qualifier(second_exchange_name) Exchange exchange) {
         return BindingBuilder.bind(queue)
                 .to(exchange)
                 .with(second_routing_key)
                 .noargs();
      }
      }
      

      消息一致性的方案

  • 一致性即要确保消息到达了消费者,并确保消息被消费掉。我们只需要确保到达queue即可


  • 建立一张通用的消息表,极简情况下,下面这2个字段可以没有
    • **retry_count**用于配合定时任务实现消息重发,表示重新发送次数。
      • 可以设置默认值,重新发送成功就改状态,一直失败就一直重试,每一次重试就-1,直到重试次数耗尽
    • **version**用于实现乐观锁。如果存在多个生产者,可以配合乐观锁避免消息发送多次
  • 生产者做完自身业务后要向消息表添加一条记录,表示有一条待发送消息。(注意添加消息记录与生产者自身业务要处于一个事务中@Transacational
  • 配置好回调的配置与rabitmqTemplate
    • 如果消息发送成功,则在回调里修改消息表该条记录的状态。
  • retry_count字段配合定义任务,不断从消息表中取出

    drop table if exists `message`;
    create table `message`
    (
      `id`              BIGINT AUTO_INCREMENT,
      `exchange`        VARCHAR(128)  NOT NULL,
      `routing_key`     VARCHAR(128)  NOT NULL,
      `message_content` VARCHAR(4096) NOT NULL,
      `status`          VARCHAR(128)  NOT NULL,   #可能是消息是否发送成功的状态  默认状态可有可无,有成功与失败2种即可。个人觉得还可以设置一种消息未被消费状态
      `retry_count`     INT           NOT NULL DEFAULT 3,
      `version`         BIGINT        NOT NULL default 0,
      PRIMARY KEY (`id`)
    ) ENGINE = InnoDB;
    

    消息自动过期

  • 未被消费的消息会堆积在 RabbitMQ 中。因此需要在创建队列,或发送消息时指定过期时间,以便于让 RabbitMQ 将这些在规定时间内未能消费的消息移除。

    @Bean
    public Queue queue() {
      Map<String, Object> arguments = new HashMap<>();
      arguments.put("x-message-ttl", 10 * 60 * 1000); // 10 分钟的过期时间
      return new Queue("user.register", true, false, false, arguments);
    }
    

  • JMS

  • Java Message Service是JavaEE的消息服务接口。JMS主要有两个版本:1.1和2.0。2.0和1.1相比,主要是简化了收发消息的代码。

  • JMS是一组接口定义,如果我们要使用JMS,还需要选择一个具体的JMS产品,即JMS服务器

    • JMS常用的实现 有ActiveMQ

      消息模型

  • 一种是Queue

    • Queue是一种一对一的通道(也可以多个consumre接一个queue),如果Consumer离线无法处理消息时,Queue会把消息存起来,等Consumer再次连接的时候发给它。
    • 设定了持久化机制的Queue不会丢失消息。如果有多个Consumer接入同一个Queue,那么它们等效于以集群方式处理消息(即作为一个整体得到一份完整消息),即每个Consumer得到的消息加起来才是一份完整消息
  • 另外一种是Topic
    • Topic则是一种一对多通道。一个Producer发出的消息,会被多个Consumer同时收到,即每个Consumer都会收到一份完整的消息流。
    • 离线时:取决于消息服务器对Topic类型消息的持久化机制。如果消息服务器不存储Topic消息,那么离线的Consumer会丢失部分离线时期的消息,如果消息服务器存储了Topic消息,那么离线的Consumer可以收到自上次离线时刻开始后产生的所有消息。
      • JMS规范通过Consumer指定一个持久化订阅可以在上线后收取所有离线期间的消息,如果指定的是非持久化订阅,那么离线期间的消息会全部丢失。
  • 如果一个Topic的消息全部都持久化了,并且只有一个Consumer,那么它和Queue其实是一样的。实际上,很多消息服务器内部都只有Topic类型的消息架构,Queue可以通过Topic“模拟”出来。
  • 多个Producer也可以写入同一个Queue或者Topic,此时消息服务器内部会自动排序确保消息总是有序的。

    实现原理

  • Producer和Consumer通常是通过TCP连接消息服务器,在编写JMS程序时,又会遇到ConnectionFactory、Connection、Session等概念,其实这和JDBC连接是类似的:

    • ConnectionFactory:代表一个到消息服务器的连接池,类似JDBC的DataSource;
    • Connection:代表一个到消息服务器的连接,类似JDBC的Connection;
    • Session:代表一个经过认证后的连接会话;
    • Message:代表一个消息对象。
  • JMS的消息类型支持以下几种:

    • TextMessage:文本消息;
    • BytesMessage:二进制消息;
    • MapMessage:包含多个Key-Value对的消息;
    • ObjectMessage:直接序列化Java对象的消息;
    • StreamMessage:一个包含基本类型序列的消息。

      Spring使用JMS

  • 以下消息以文本消息举例(即json)举例

  • 生产者和消费者不能使用@Server等延伸注解,必须使用@Component!

    依赖

    ```xml

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
          <version>5.2.19.RELEASE</version>
      </dependency>
    
      <dependency>
          <groupId>javax.jms</groupId>
          <artifactId>javax.jms-api</artifactId>
          <version>2.0.1</version>
      </dependency>
    
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-jms-client</artifactId>
          <version>2.13.0</version>
      </dependency>
    
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-handler-proxy</artifactId>
          <version>4.1.45.Final</version>
      </dependency>
    
<a name="Qkte2"></a>
### 配置类

- 配置类加上`@ComponentScan`  `@EnableJms` _// 启用JMS_
   - _以防万一,实测时没有这两个注解也没啥_
- url是`tcp://localhost:61616`的形式,前面的`tcp://`不要丢了,没有一般表示http连接
```java
    @Value("${jms.uri}")    //自己随意创建的外部配置
    String uri;     
    @Value("${jms.username}")
    String username;
    @Value("${jms.password}")
    String password;

    //消息服务连接
    @Bean
    ConnectionFactory createJMSConnectionFactory() {
        return new ActiveMQJMSConnectionFactory(uri, username, password);
    }

    //jms模板是spring提供的工具类,用于简化发消息
    @Bean
    JmsTemplate createJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }

    //侦听器
    @Bean("jmsListenerContainerFactory")
    DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
        var factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

生产者

  • send的第一个参数为要发送的队列名称,名称是任意的,下面这种形式就更加规范些

    • Artemis消息服务器默认配置下会自动创建Queue,因此不必手动创建一个名为jms/queue/mail的Queue,但不是所有的消息服务器都会自动创建Queue,生产环境的消息服务器通常会关闭自动创建功能,需要手动创建Queue
    • 即我们如果发送给一个不存在的队列,那么消息服务器就会自动创建该队列
      @Component
      public class MessagingService {
      @Autowired
      ObjectMapper objectMapper;
      @Autowired
      JmsTemplate jmsTemplate;
      public void sendMailMessage(User msg) throws JsonProcessingException {
         String text = objectMapper.writeValueAsString(msg);
         jmsTemplate.send("jms/queue/mail", new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 return session.createTextMessage(text);
             }
         });
      }
      }
      

      侦听器(消费者)

  • 侦听器创建后会自动侦听指定消息队列,并执行方法体的消息处理

    • 每一个进入队列的消息被侦听到就会执行@JmsListener方法
  • 处理消息的核心代码是编写一个Bean,并在处理方法上标注@JmsListener

    • destination消息来源的消息队列名称
    • concurrency表示可以最多同时并发处理?个消息,如果是5-10表示并发处理的线程可以在5~10之间调整。 ```java @Component @Slf4j public class MailMessageListener { @Autowired ObjectMapper objectMapper; //jackson用的

      @JmsListener(destination = “jms/queue/mail”, concurrency = “10”) public void onMailMessageReceived(Message message) throws throws JsonProcessingException, JMSException { if (message instanceof TextMessage) {

         String text = ((TextMessage) message).getText();  //json字符串
         User user = objectMapper.readValue(text, User.class);
      

      } else {

         log.error("unable to process non-text message!");
      

      } } }

<a name="ZI1WW"></a>
### 测试

- 直接将消息传给生产者即可,我们可以编写2个项目,一个只有消费者,一个只有生产者
   - 生产者项目先关闭,执行消费者,再执行生产者,可以看到**能读取到离线消息,且消息未丢失**
   - 生产者启动,再执行消费者,可以看到生产者**能实时读取到消息**
   - 短时间给生产者传大量消息,能看到消费者的**消息顺序是依次读取的,是有序的**
<a name="AFRmx"></a>
## springboot使用JMS

- springboot实现jms和spring几乎一模一样,不同的地方就是:外部配置文件是spring规定的;自动完成了配置类;只需引入一个springboot的artemis即可
<a name="lr3Eg"></a>
### 依赖
```xml
 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-artemis</artifactId>
        </dependency>

配置

spring.artemis.mode=native     #指定连接外部Artemis服务器,而不是启动嵌入式服务:
spring.artemis.host=127.0.0.1
spring.artemis.port=61616
spring.artemis.user=admin
spring.artemis.password=123456

Kafka

  • Kafka也是一个消息服务器,它的特点一是快,二是有巨大的吞吐量,Kafka没有实现任何标准的消息接口,它自己提供的API就是Kafka的接口。
  • Kafka本身是Scala编写的,运行在JVM之上。Producer和Consumer都通过Kafka的客户端使用网络来与之通信。从逻辑上讲,Kafka设计非常简单,它只有一种类似JMS的Topic的消息通道
    • 但是Kafka可以进行分区。Kafka的一个Topic可以有一个至多个Partition,并且可以分布到多台机器上
    • Kafka只保证在一个Partition内部,消息是有序的,但是,存在多个Partition的情况下(即多个group),Producer发送的3个消息会依次发送到Partition-1、Partition-2和Partition-3,Consumer从3个Partition接收的消息并不一定是Producer发送的顺序,因此,多个Partition只能保证接收消息大概率按发送时间有序,并不能保证完全按Producer发送的顺序。这一点在使用Kafka作为消息服务器时要特别注意,对发送顺序有严格要求的Topic只能有一个Partition。
    • Kafka的另一个特点是消息发送和接收都尽量使用批处理,一次处理几十甚至上百条消息,比一次一条效率要高很多。
    • kafka消息的持久性:消息保存多久取决于服务器的配置,可以按照时间删除(默认3天),也可以按照文件大小删除,因此,只要Consumer在离线期内的消息还没有被删除,再次上线仍然可以接收到完整的消息流。
      • Consumer在离线期内的消息还没有被删除,再次上线仍然可以接收到完整的消息流。这一功能实际上是客户端自己实现的,客户端会存储它接收到的最后一个消息的offsetId,再次上线后按上次的offsetId查询。offsetId是Kafka标识某个Partion的每一条消息的递增整数,客户端通常将它存储在ZooKeeper中。

image.png

Springboot使用Kafka

依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

  • 除了必须指定外,相关的配置项均为调优选项,即表示大概范围的配置。例如,表示一次最多抓取100条消息

    spring:
    kafka:
      bootstrap-servers: localhost:9092
      consumer:   //这下面是消费者的配置
        auto-offset-reset: latest
        max-poll-records: 100
        max-partition-fetch-bytes: 1000000
        #bootstrap-servers:    生产者消费者也可以配置不同的服务器
    

    生产者

  • 发送消息需指定Topic名称,消息正文。header名称为type,值为消息的类型,即class全名

  • Spring Boot自动为我们创建一个KafkaTemplate用于发送消息。注意到这是一个泛型类,而默认配置总是使用String作为Kafka消息的类型,所以注入KafkaTemplate即可

    @Component
    class KafkaMessagingService {
      @Autowired
      ObjectMapper objectMapper;   //jackson序列化用
    
      @Autowired
      KafkaTemplate<String, String> kafkaTemplate;
    
      public void sendRegistrationMessage(User msg) throws IOException {
          send("topic_registration", msg);
      }
    
      public void sendLoginMessage(User msg) throws IOException {
          send("topic_login", msg);
      }
    
      private void send(String topic, Object msg) throws IOException {
          ProducerRecord<String, String> pr = new ProducerRecord<>(topic, objectMapper.writeValueAsString(msg));
          pr.headers().add("type", msg.getClass().getName().getBytes(StandardCharsets.UTF_8));
          kafkaTemplate.send(pr);
      }
    }
    

    消费者

  • 一个topic可以有多个group(即分区,即上面提到的Partition),不同的group每个收到的都是完整的一份消息。如果存在多个相同group的监听器方法,那么这些方法作为一个整体收到一份消息

  • @Payload表示传入的是消息正文,使用@Header可传入消息的指定Header
  • Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是2,可以通过server.properties修改默认分区数量。
    • 在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。和RabbitMQ相比,Kafka并不提供网页版管理后台,管理Topic需要使用命令行,比较繁琐,只有云服务商通常会提供更友好的管理后台。 ```java import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;

@Component public class KafkaTopicMessageListener { private final Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
ObjectMapper objectMapper;

@KafkaListener(topics = "topic_registration", groupId = "group1")
public void onRegistrationMessage(@Payload String message, @Header("type") String type) throws Exception {
    User msg = objectMapper.readValue(message, getType(type));
    logger.info("received registration message: {}", msg);
}

@KafkaListener(topics = "topic_login", groupId = "group1")
public void onLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
    User msg = objectMapper.readValue(message, getType(type));
    logger.error("received login message: {}", msg);
}

@KafkaListener(topics = "topic_login", groupId = "group2")
public void processLoginMessage(@Payload String message, @Header("type") String type) throws Exception {
    User msg = objectMapper.readValue(message, getType(type));
    logger.error("process login message: {}", msg);
}

@SuppressWarnings("unchecked")
private static <T> Class<T> getType(String type) {
    // TODO: use cache:
    try {
        return (Class<T>) Class.forName(type);
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
}

}

<a name="N9yFE"></a>
# 各消息服务器的安装
<a name="tLt0L"></a>
## ActiveMQ

- 版本:`artemis 2.19.0`
<a name="kL4St"></a>
### 安装-配置-启动

- 官网下载时,有2种:`ActiveMQ Classic`或者`ActiveMQ Artemis`
   - 前者基于`jms1.1`
   - 后者支持2.0,1.x貌似也可以。
      - 还使用基于Netty的异步IO,大大提升了性能。
      - 此外,Artemis不仅提供了JMS接口,它还提供了AMQP接口,STOMP接口和物联网使用的MQTT接口。选择Artemis,相当于一鱼四吃。
- **Artemis一些较新的版本对于jdk有要求,如**`**2.20.0**`**要求**`**11+**`**。Classic不清楚**
- Artemis把程序和数据完全分离,所以我们解压后需要创建个数据文件夹
   - 可以理解为解压安装包得到的是个总的程序目录,数据文件夹对应一个用户的服务的文件夹

---

1. 解压后配置环境变量`ARTEMIS_HOME`,指向安装目录。   然后path添加`%ARTEMIS_HOME%\bin`
   1. 环境变量必须按照上面配置,artemis会寻找名为`ARTEMIS_HOME`的环境变量
2. 创建数据文件夹:cd到bin下,执行`artemis create  空文件夹路径`  先创建好一个空文件夹
   1. 之后自动创建连接用户,依次输入用户名,密码,是否允许匿名登陆。  用户名我们设置为admin,匿名登录为N
3. 切换到数据文件夹的bin,执行`artemis run`启动Artemis服务
   1. 启动成功后,Artemis提示可以通过URL`http://localhost:8161/console`访问管理后台。注意_不要关闭命令行窗口_
   1. `61616`端口是artemis服务的端口(tcp连接端口)   8161是后台管理端口
<a name="gyOkK"></a>
## RabbitMQ

- 版本:`3.9.13`
<a name="fPytH"></a>
### 安装

1. RabbitMQ基于Erlang开发,所以需要首先拥有Erlang环境。下载Erlang的`exe`包安装即可
   1. **要以管理员身份运行安装包!**
2. 之后再直接运行`rabbitmq-server-{version}.exe`包即可
2. 访问管理后台[http://localhost:15672](http://localhost:15672/),成功则没问题,没成功可能是没有启动rabbitmq服务,windwos下一般是默认安装后就自启动的
   1. RabbitMQ后台管理的默认用户名和口令均为guest
   1. 如果无法访问,但是服务已经启动,依次执行如下列操作
```powershell
进入rabbitmq的安装路径下,找到sbin目录,依次输入如下指令解决问题:
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop

如果上述命令出现:ERLANG_HOME not set correctly.    那么执行下面命令
set ERLANG_HOME=Erlang安装路径

Kafka

  • 下载二进制文件(tgz) 版本:2.12.3-3.1.0
    • 解压后得到的bin里面还有一个windows,bin里是linux的程序,bin/windows里是windows程序
  • kafka需要zookeeper环境,zookeeper是一个布式应用程序协调服务,为分布式应用提供一致性服务。

    • kafka包里一般内置zookeeper的程序了

      运行

  • 最好将kakfa解压至d盘,放c盘可能会报命令过长 ```powershell

    打开第一个窗口,启动zookeeper

    cd kafka安装目录 bin\windows\zookeeper-server-start.bat config\zookeeper.properties

打开第二个窗口,启动kafka

cd kafka安装目录 bin\windows\kafka-server-start.bat config\server.properties

退出的话先后在kafka和zookeeper的窗口按:ctrl+c y退出

```