原理:
交换机种类
RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种类型
(1)fanout:它会把所有的交换器上的消息路由到所有与该交换器邦定的队列中,不需要BindingKey生效
(2)direct:它会把消息路由到BindingKey与RoutingKey完全匹配的队列中。比如在发送消息的时候,设置Label中RoutingKey为warning,则消息会路由到Queue1与Queue2上(请看下图)。
(3)topic:是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配,但是匹配规则不一样,支持模糊匹配。有如下的规则
- RoutingKey为一个点号“.”分隔的字符串,每个被隔开的独立字符串即为一个单词,是匹配的单位;
- BindingKey和RoutingKey一样,也是"."分割的字符串;
- 但不同的是BindingKey,可以用“#”,“*”进行类似于占位符的模糊匹配,“#”表示一个单词,"*"表示多个单词(也可以是零个)
比如:
RoutingKey为com.hidden.client的消息只会到队列Queue2中:因为只有Queue2的BindingKey=..client匹配com.hidden.client
RoutingKey为com.rabbitmq.client的消息会到队列Queue1-2中: 因为Queue1的BindingKey=.rabbitmq..匹配com.rabbitmq.client,Queue2的BindingKey=..client匹配com.rabbitmq.client
RoutingKey为java.rabbitmq.demo的消息只会到队列Queue1中:因为只有Queue1的BindingKey=.rabbitmq..匹配java.rabbitmq.demo
RoutingKey为java.util.concurrent的消息会被丢弃或者返回给生产者(需要设置)
(4)headers:依赖发送消息内容中的hearders属性进行匹配,在绑定队列和交换器时指定一组键值对,这里的也就是headers,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,通过比较会路由到相关队列中,这种交换器性能会很差,一般不会使用。
应答模式
- AcknowledgeMode.NONE:不确认
- AcknowledgeMode.AUTO:自动确认
- AcknowledgeMode.MANUAL:手动确认
全局配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
在特定接受者上配置
```java @RabbitListener(queues =”topic.message”) public void process1(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel)throws Exception {
}//定义队列的消费者
Consumer consumer = new DefaultConsumer(channel);
//监听队列,设置手动返回完成,第二个参数 false表示需要手动确认返回
channel.basicConsume("topic.message",false,consumer);
//手动确认返回-不成功就用basicNack
channel.basicAck(deliveryTag,false);
System.out.print("这里是手动返回确认信息 ");
<a name="qN1Mp"></a>
# pom配置
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-demo</artifactId>
<groupId>demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>demo-stream-rabbit-provider</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!--健康监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- mybatis 和SpringBoot 整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<!-- jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>
yaml配置
server:
port: 8003
spring:
application:
name: demo-stream-rabbit-provider
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.2.20:30569/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: root
cloud:
nacos:
server-addr: 192.168.2.20:30010
username: nacos
password: nacos
sentinel:
transport:
dashboard: 192.168.2.20:31596 # 控制台的安装位置
# port: 8719 # 与sentinel单独连接的端口
# client-ip: 192.168.2.6 # 本机的ip,如果sentinel装在虚拟机,必须配这个
# port: 30195 # 与sentinel单独连接的端口
# client-ip: 192.168.2.20 # 本机的ip,如果sentinel装在虚拟机,必须配这个
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
# environment: # 设置rabbitmq的相关的环境配置
# spring:
# rabbitmq:
# host: 192.168.2.20
# port: 31672
# username: user
# password: ze2tgb2WGC
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: default-group
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: default-group
rabbitmq:
host: 192.168.2.20
port: 31672
username: user
password: ze2tgb2WGC
# 暴露应用信息
management:
endpoints:
web:
exposure:
include: '*'
示例:controller
package com.test.rabbit.consumer.controller;
import com.test.rabbit.consumer.service.MessageProviderService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author jia
* @since 2022-04-05 11:39
*/
@RestController
@RequestMapping("/rabbit")
@RequiredArgsConstructor
public class SendMessageController {
protected final MessageProviderService messageProviderService;
@GetMapping("/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}
示例:service
package com.test.rabbit.consumer.serviceimpl;
import com.test.rabbit.consumer.service.MessageProviderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author jia
* @since 2022-04-05 11:42
*/
@EnableBinding(Source.class)
@Slf4j
public class MessageProviderServiceImpl implements MessageProviderService {
@Resource
MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
log.info("***** serial: {}", serial);
return null;
}
}
参考
https://github.com/spring-cloud/spring-cloud-stream-samples/
https://www.jianshu.com/p/33cbe3132e63