使用rabbit步骤
1.引入相关依赖
<dependency>
<groupId>com.xy</groupId>
<artifactId>xy-core-mq-rabbit-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
2.配置环境参数
spring:
application:
name: test-rabbit
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
rabbitmq:
enable: true
hard-code-init: true
#addresses: localhost:5672 链接到集群
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
#requested-heartbeat: 1s
publisher-confirm-type: correlated
publisher-returns: true
cache:
connection:
mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTION
size: 20 #缓存的连接数,只有是CONNECTION模式时生效
listener:
type: simple
simple:
acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
#auto-startup: true #是否启动时自动启动容器
batch-size: 20
concurrency: 2 #最小的消费者数量
default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
#idle-event-interval: 1000ms #多少长时间发布空闲容器时间,单位毫秒
max-concurrency: 8 #最大的消费者数量
missing-queues-fatal: true #队列不存在是否跑异常
prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
retry:
enabled: true #监听重试是否可用
initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
max-attempts: 3 #最大重试次数
max-interval: 10000ms #最大重试时间间隔
multiplier: 2.0D #应用于上一重试间隔的乘数
stateless: true #重试是有状态or无状态
template:
receive-timeout: 10000ms #receive() 操作的超时时间
mandatory: false # 启用强制信息;默认false
reply-timeout: 10000ms #sendAndReceive() 操作的超时时间
retry:
enabled: true #发送重试是否可用
max-attempts: 3 #最大重试次数
initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
multiplier: 2.0D #应用于上一重试间隔的乘数
max-interval: 10000ms #最大重试时间间隔
3.定义交换机,路由,队列信息
继承ExchangeQueueConfig,添加交换机,路由,队列绑定信息,将ExchangeQueueConfig作为Component添加到容器中
@Component
public class TestExchangeQueueConfig extends ExchangeQueueConfig {
@Override
protected List<ExchangeRoutingKeyQueue> configExchangeQueue() {
List<ExchangeRoutingKeyQueue> result = new ArrayList<>();
ExchangeRoutingKeyQueue exchangeQueue1 = new ExchangeRoutingKeyQueue(ExchangeEnum.FANOUT, "aaa", "bbb", "ccc", true);
ExchangeRoutingKeyQueue exchangeQueue2 = new ExchangeRoutingKeyQueue(ExchangeEnum.TOPIC, "TTT", "MMM", "DDD", false);
result.add(exchangeQueue1);
result.add(exchangeQueue2);
return result;
}
}
4.XyConfirmCallback
如果需要发送确认回调,则继承XyConfirmCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作
@Component
public class XyConfirmCallbackComponent extends XyConfirmCallback {
@Override
protected void call(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.err.println(">>>1111111111111111111" + correlationData.getId() + "," + cause);
}
}
}
5.XyReturnCallback
如果需要对发送的消息没有进入队列确认回调,则继承XyReturnCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作
@Component
public class XyReturnCallbackComponent extends XyReturnCallback {
@Override
protected void call(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println(">>>222222222222222222222" + message + "," + replyCode + "," + replyText + "," + exchange + "," + routingKey);
}
}
6.手动确认
消费者方法参数中添加Channel channel,通过Channel做手动确认
@RabbitMqConsumerDot(group = "group2", desc = "desc2", author = "author2", name = "name2")
@RabbitListener(queues = {"fanout.queue"})
public void receiveFanout(TaskQueueModel taskQueueModel, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// System.err.println("2:" + taskQueueModel + "2:deliveryTag" + deliveryTag);
long id = Thread.currentThread().getId();
System.err.println("thread id is " + id);
try {
TimeUnit.SECONDS.sleep(2);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
//网络异常
channel.basicNack(deliveryTag, false, true);
}
}
@RabbitMqConsumerDot:提供监控预警跟踪
如果无需手动确认,则方法参数保留接收对象,直接消费即可
7.配置项说明
server:
port: 8081
spring:
application:
name: test-rabbit
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
rabbitmq:
enable: true
hard-code-init: true
#addresses: localhost:5672 链接到集群
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
requested-heartbeat: 1s
publisher-confirm-type: correlated
publisher-returns: true
cache:
#channel:
# size: 20 #缓存中保持的channel数量
# checkout-timeout: 1000ms #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
connection:
mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTION
size: 20 #缓存的连接数,只有是CONNECTION模式时生效
# routing-key:
# dynamic: true
# connection-timeout: 50000
listener:
type: simple
direct:
acknowledge-mode: manual
#auto-startup: true
#consumers-per-queue: 1
#default-requeue-rejected: true
#idle-event-interval: 0
#missing-queues-fatal: true
prefetch: 1
#retry:
# enabled: true
# initial-interval: 1000ms
# max-attempts: 3
# max-interval: 10000ms
# multiplier: 1.0
# stateless: true
simple:
acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
auto-startup: true #是否启动时自动启动容器
batch-size: 0
concurrency: 0 #最小的消费者数量
default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
idle-event-interval: #多少长时间发布空闲容器时间,单位毫秒
max-concurrency: 0 #最大的消费者数量
missing-queues-fatal: true #队列不存在是否跑异常
prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
retry:
enabled: true #监听重试是否可用
initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
max-attempts: 3 #最大重试次数
max-interval: 10000ms #最大重试时间间隔
multiplier: 2.0D #应用于上一重试间隔的乘数
stateless: true #重试是有状态or无状态
template:
receive-timeout: 10000ms #receive() 操作的超时时间
mandatory: false # 启用强制信息;默认false
reply-timeout: 10000ms #sendAndReceive() 操作的超时时间
retry:
enabled: true #发送重试是否可用
max-attempts: 3 #最大重试次数
initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔
multiplier: 2.0D #应用于上一重试间隔的乘数
max-interval: 10000ms #最大重试时间间隔
exchange: #默认的发送路由
routing-key: #默认路由key
default-receive-queue: #默认队列
ssl:
enabled: true #是否支持ssl
key-store: #指定持有SSL certificate的key store的路径
key-store-type: PKCS12 #加密类型
key-store-password: #指定访问key store的密码
trust-store: # 指定持有SSL certificates的Trust store
trust-store-type: JKS #加密类型
trust-store-password: #指定访问trust store的密码
algorithm: #ssl使用的算法,例如,TLSv1.1
spring-cloud-stream
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/#_configuration_options