1 引言

Stream就是在消息队列的基础上,对其进行封装,让咱们更方便的去操作MQ消息队列。
image.png

2 Stream快速入门

启动RabbitMQ
消费者-导入依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  4. </dependency>

消费者-配置文件

  1. spring:
  2. # 连接RabbitMQ
  3. rabbitmq:
  4. host: 192.168.199.109
  5. port: 5672
  6. username: test
  7. password: test
  8. virtual-host: /test

消费者-监听的队列

  1. public interface StreamClient {
  2. @Input("myMessage")
  3. SubscribableChannel input();
  4. }
  5. //-------------------------------------------------
  6. @Component
  7. @EnableBinding(StreamClient.class)
  8. public class StreamReceiver {
  9. @StreamListener("myMessage")
  10. public void msg(Object msg){
  11. System.out.println("接收到消息: " + msg);
  12. }
  13. }

生产者-导入依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  4. </dependency>

生产者-配置文件

  1. spring:
  2. # 连接RabbitMQ
  3. rabbitmq:
  4. host: 192.168.199.109
  5. port: 5672
  6. username: test
  7. password: test
  8. virtual-host: /test

生产者-发布消息

  1. public interface StreamClient {
  2. @Output("myMessage")
  3. MessageChannel output();
  4. }
  5. //---------------------------------------------- 在启动类中添加注解 @EnableBinding(StreamClient.class)
  6. @Autowired
  7. private StreamClient streamClient;
  8. @GetMapping("/send")
  9. public String send(){
  10. streamClient.output().send(MessageBuilder.withPayload("Hello Stream!!").build());
  11. return "消息发送成功!!";
  12. }

3 Stream重复消费问题

只需要添加一个配置,指定消费者组

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. myMessage: # 队列名称
  6. group: customer # 消费者组

4 Stream的消费者手动ack

编写配置

  1. spring:
  2. cloud:
  3. stream:
  4. # 实现手动ACK
  5. rabbit:
  6. bindings:
  7. myMessage:
  8. consumer:
  9. acknowledgeMode: MANUAL

修改消费端方法

  1. @StreamListener("myMessage")
  2. public void msg(Object msg,
  3. @Header(name = AmqpHeaders.CHANNEL) Channel channel,
  4. @Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
  5. System.out.println("接收到消息: " + msg);
  6. channel.basicAck(deliveryTag,false);
  7. }