使用rabbit步骤
1.引入相关依赖
<dependency><groupId>com.xy</groupId><artifactId>xy-core-mq-rabbit-boot-starter</artifactId><version>1.0.0</version></dependency>
2.配置环境参数
spring:application:name: test-rabbitautoconfigure:exclude:- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfigurationrabbitmq:enable: truehard-code-init: true#addresses: localhost:5672 链接到集群host: localhostport: 5672virtual-host: /username: guestpassword: guest#requested-heartbeat: 1spublisher-confirm-type: correlatedpublisher-returns: truecache:connection:mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTIONsize: 20 #缓存的连接数,只有是CONNECTION模式时生效listener:type: simplesimple:acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto#auto-startup: true #是否启动时自动启动容器batch-size: 20concurrency: 2 #最小的消费者数量default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)#idle-event-interval: 1000ms #多少长时间发布空闲容器时间,单位毫秒max-concurrency: 8 #最大的消费者数量missing-queues-fatal: true #队列不存在是否跑异常prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.retry:enabled: true #监听重试是否可用initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔max-attempts: 3 #最大重试次数max-interval: 10000ms #最大重试时间间隔multiplier: 2.0D #应用于上一重试间隔的乘数stateless: true #重试是有状态or无状态template:receive-timeout: 10000ms #receive() 操作的超时时间mandatory: false # 启用强制信息;默认falsereply-timeout: 10000ms #sendAndReceive() 操作的超时时间retry:enabled: true #发送重试是否可用max-attempts: 3 #最大重试次数initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔multiplier: 2.0D #应用于上一重试间隔的乘数max-interval: 10000ms #最大重试时间间隔
3.定义交换机,路由,队列信息
继承ExchangeQueueConfig,添加交换机,路由,队列绑定信息,将ExchangeQueueConfig作为Component添加到容器中
@Componentpublic class TestExchangeQueueConfig extends ExchangeQueueConfig {@Overrideprotected List<ExchangeRoutingKeyQueue> configExchangeQueue() {List<ExchangeRoutingKeyQueue> result = new ArrayList<>();ExchangeRoutingKeyQueue exchangeQueue1 = new ExchangeRoutingKeyQueue(ExchangeEnum.FANOUT, "aaa", "bbb", "ccc", true);ExchangeRoutingKeyQueue exchangeQueue2 = new ExchangeRoutingKeyQueue(ExchangeEnum.TOPIC, "TTT", "MMM", "DDD", false);result.add(exchangeQueue1);result.add(exchangeQueue2);return result;}}
4.XyConfirmCallback
如果需要发送确认回调,则继承XyConfirmCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作
@Componentpublic class XyConfirmCallbackComponent extends XyConfirmCallback {@Overrideprotected void call(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {System.err.println(">>>1111111111111111111" + correlationData.getId() + "," + cause);}}}
5.XyReturnCallback
如果需要对发送的消息没有进入队列确认回调,则继承XyReturnCallback抽象类,并将其作为Component添加到容器中,在方法中可以自定义相关的业务操作
@Componentpublic class XyReturnCallbackComponent extends XyReturnCallback {@Overrideprotected void call(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.err.println(">>>222222222222222222222" + message + "," + replyCode + "," + replyText + "," + exchange + "," + routingKey);}}
6.手动确认
消费者方法参数中添加Channel channel,通过Channel做手动确认
@RabbitMqConsumerDot(group = "group2", desc = "desc2", author = "author2", name = "name2")@RabbitListener(queues = {"fanout.queue"})public void receiveFanout(TaskQueueModel taskQueueModel, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();// System.err.println("2:" + taskQueueModel + "2:deliveryTag" + deliveryTag);long id = Thread.currentThread().getId();System.err.println("thread id is " + id);try {TimeUnit.SECONDS.sleep(2);channel.basicAck(deliveryTag, false);} catch (Exception e) {e.printStackTrace();//网络异常channel.basicNack(deliveryTag, false, true);}}
@RabbitMqConsumerDot:提供监控预警跟踪
如果无需手动确认,则方法参数保留接收对象,直接消费即可
7.配置项说明
server:port: 8081spring:application:name: test-rabbitautoconfigure:exclude:- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfigurationrabbitmq:enable: truehard-code-init: true#addresses: localhost:5672 链接到集群host: localhostport: 5672virtual-host: /username: guestpassword: guestrequested-heartbeat: 1spublisher-confirm-type: correlatedpublisher-returns: truecache:#channel:# size: 20 #缓存中保持的channel数量# checkout-timeout: 1000ms #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channelconnection:mode: connection #连接工厂缓存模式:CHANNEL 和 CONNECTIONsize: 20 #缓存的连接数,只有是CONNECTION模式时生效# routing-key:# dynamic: true# connection-timeout: 50000listener:type: simpledirect:acknowledge-mode: manual#auto-startup: true#consumers-per-queue: 1#default-requeue-rejected: true#idle-event-interval: 0#missing-queues-fatal: trueprefetch: 1#retry:# enabled: true# initial-interval: 1000ms# max-attempts: 3# max-interval: 10000ms# multiplier: 1.0# stateless: truesimple:acknowledge-mode: manual #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认autoauto-startup: true #是否启动时自动启动容器batch-size: 0concurrency: 0 #最小的消费者数量default-requeue-rejected: true #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)idle-event-interval: #多少长时间发布空闲容器时间,单位毫秒max-concurrency: 0 #最大的消费者数量missing-queues-fatal: true #队列不存在是否跑异常prefetch: 1 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.retry:enabled: true #监听重试是否可用initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔max-attempts: 3 #最大重试次数max-interval: 10000ms #最大重试时间间隔multiplier: 2.0D #应用于上一重试间隔的乘数stateless: true #重试是有状态or无状态template:receive-timeout: 10000ms #receive() 操作的超时时间mandatory: false # 启用强制信息;默认falsereply-timeout: 10000ms #sendAndReceive() 操作的超时时间retry:enabled: true #发送重试是否可用max-attempts: 3 #最大重试次数initial-interval: 1000ms #第一次和第二次尝试发布或传递消息之间的间隔multiplier: 2.0D #应用于上一重试间隔的乘数max-interval: 10000ms #最大重试时间间隔exchange: #默认的发送路由routing-key: #默认路由keydefault-receive-queue: #默认队列ssl:enabled: true #是否支持sslkey-store: #指定持有SSL certificate的key store的路径key-store-type: PKCS12 #加密类型key-store-password: #指定访问key store的密码trust-store: # 指定持有SSL certificates的Trust storetrust-store-type: JKS #加密类型trust-store-password: #指定访问trust store的密码algorithm: #ssl使用的算法,例如,TLSv1.1
spring-cloud-stream
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/#_configuration_options
