1 引言
Stream就是在消息队列的基础上,对其进行封装,让咱们更方便的去操作MQ消息队列。
2 Stream快速入门
启动RabbitMQ
消费者-导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
消费者-配置文件
spring:
# 连接RabbitMQ
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
消费者-监听的队列
public interface StreamClient {
@Input("myMessage")
SubscribableChannel input();
}
//-------------------------------------------------
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
@StreamListener("myMessage")
public void msg(Object msg){
System.out.println("接收到消息: " + msg);
}
}
生产者-导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
生产者-配置文件
spring:
# 连接RabbitMQ
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
生产者-发布消息
public interface StreamClient {
@Output("myMessage")
MessageChannel output();
}
//---------------------------------------------- 在启动类中添加注解 @EnableBinding(StreamClient.class)
@Autowired
private StreamClient streamClient;
@GetMapping("/send")
public String send(){
streamClient.output().send(MessageBuilder.withPayload("Hello Stream!!").build());
return "消息发送成功!!";
}
3 Stream重复消费问题
只需要添加一个配置,指定消费者组
spring:
cloud:
stream:
bindings:
myMessage: # 队列名称
group: customer # 消费者组
4 Stream的消费者手动ack
编写配置
spring:
cloud:
stream:
# 实现手动ACK
rabbit:
bindings:
myMessage:
consumer:
acknowledgeMode: MANUAL
修改消费端方法
@StreamListener("myMessage")
public void msg(Object msg,
@Header(name = AmqpHeaders.CHANNEL) Channel channel,
@Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
System.out.println("接收到消息: " + msg);
channel.basicAck(deliveryTag,false);
}