如果你注定要成为厉害的人, 那问题的答案就深藏在你的血脉里。 本篇文章主要讲述Spring Boot与RabbitMQ的整合。因为我们公司的云服务用到了RabbitMQ 技术,之前都是自己封装,正好我们也正在往SpringBoot转变,这个技术正好用到,看来代码又要重构咯。
有想了解重构的朋友,我之前也有对《重构》一书的解读,出门左转就能看到。


导包:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

消息生产者

ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)

  1. package cn.usr.springbootrabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  8. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Scope;
  12. /**
  13. * @program: Learn-SpringBootRabbitmq
  14. * @author: Rock 【shizhiyuan@usr.cn】
  15. * @Date: 2018/2/23 0023
  16. */
  17. @Configuration
  18. public class AmqpConfig {
  19. public static final String EXCHANGE = "spring-boot-exchange2";
  20. public static final String ROUTINGKEY = "spring-boot-routingKey2";
  21. @Bean
  22. public CachingConnectionFactory connectionFactory() {
  23. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  24. connectionFactory.setAddresses("127.0.0.1");
  25. connectionFactory.setUsername("guest");
  26. connectionFactory.setPassword("guest");
  27. connectionFactory.setVirtualHost("/");
  28. // 这里需要显示调用才能进行消息的回调 必须要设置
  29. connectionFactory.setPublisherConfirms(true);
  30. return connectionFactory;
  31. }

RabbitTemplate

  1. @Bean
  2. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  3. public RabbitTemplate rabbitTemplate() {
  4. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  5. return template;
  6. }

这里设置为原型,具体的原因在后面会讲到,在发送消息时通过调用RabbitTemplate中的如下方法:
一会调用的时候用:

  1. public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData)

Producer

调用啦:

  1. package cn.usr.springbootrabbitmq;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.amqp.rabbit.support.CorrelationData;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import java.util.UUID;
  7. /**
  8. * @program: Learn-SpringBootRabbitmq
  9. * @author: Rock 【shizhiyuan@usr.cn】
  10. * @Date: 2018/2/23 0023
  11. */
  12. @Component
  13. public class Producer implements RabbitTemplate.ConfirmCallback {
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. * 构造方法注入
  17. */
  18. @Autowired
  19. public Producer(RabbitTemplate rabbitTemplate) {
  20. this.rabbitTemplate = rabbitTemplate;
  21. //这是是设置回调能收到发送到响应,confirm()在下面解释
  22. rabbitTemplate.setConfirmCallback(this);
  23. }
  24. public void sendMsg(String content) {
  25. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  26. //convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容,correlationData:消息ID)
  27. rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  28. }
  29. @Override
  30. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  31. System.out.println(" 回调id:" + correlationData);
  32. if (ack) {
  33. System.out.println("消息成功消费");
  34. } else {
  35. System.out.println("消息消费失败:" + cause);
  36. }
  37. }
  38. }

如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。

消息消费者

还是在AmqpConfig.class里面
步骤就是

  1. 声明交换机
  2. 声明队列
  3. 绑定RoutingKey
    1. /**
    2. * 针对消费者配置
    3. * 1. 设置交换机类型
    4. * 2. 将队列绑定到交换机
    5. * <p>
    6. * <p>
    7. * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
    8. * HeadersExchange :通过添加属性key-value匹配
    9. * DirectExchange:按照routingkey分发到指定队列
    10. * TopicExchange:多关键字匹配
    11. */
    12. @Bean
    13. public DirectExchange defaultExchange() {
    14. return new DirectExchange(EXCHANGE);
    15. }
    16. @Bean
    17. public Queue queue() {
    18. return new Queue("spring-boot-queue", true);//队列持久
    19. }
    20. @Bean
    21. public Binding binding() {
    22. return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
    23. }
    24. @Bean
    25. public SimpleMessageListenerContainer messageContainer() {
    26. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    27. container.setQueues(queue());
    28. container.setExposeListenerChannel(true);
    29. container.setMaxConcurrentConsumers(1);
    30. container.setConcurrentConsumers(1);
    31. // 设置确认模式手工确认
    32. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    33. container.setMessageListener(new ChannelAwareMessageListener() {
    34. @Override
    35. public void onMessage(Message message, Channel channel) throws Exception {
    36. byte[] body = message.getBody();
    37. System.out.println("receive msg : " + new String(body));
    38. //确认消息成功消费
    39. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    40. }
    41. });
    42. return container;
    43. }
    下面是完整的配置:
    1. package cn.usr.springbootrabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.core.*;
    4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    5. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    8. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.context.annotation.Configuration;
    11. import org.springframework.context.annotation.Scope;
    12. /**
    13. * @program: Learn-SpringBootRabbitmq
    14. * @author: Rock 【shizhiyuan@usr.cn】
    15. * @Date: 2018/2/23 0023
    16. */
    17. @Configuration
    18. public class AmqpConfig {
    19. public static final String EXCHANGE = "spring-boot-exchange2";
    20. public static final String ROUTINGKEY = "spring-boot-routingKey2";
    21. @Bean
    22. public CachingConnectionFactory connectionFactory() {
    23. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    24. connectionFactory.setAddresses("127.0.0.1");
    25. connectionFactory.setUsername("guest");
    26. connectionFactory.setPassword("guest");
    27. connectionFactory.setVirtualHost("/");
    28. // 这里需要显示调用才能进行消息的回调 必须要设置
    29. connectionFactory.setPublisherConfirms(true);
    30. return connectionFactory;
    31. }
    32. @Bean
    33. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    34. public RabbitTemplate rabbitTemplate() {
    35. RabbitTemplate template = new RabbitTemplate(connectionFactory());
    36. return template;
    37. }
    38. /**
    39. * 针对消费者配置
    40. * 1. 设置交换机类型
    41. * 2. 将队列绑定到交换机
    42. * <p>
    43. * <p>
    44. * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
    45. * HeadersExchange :通过添加属性key-value匹配
    46. * DirectExchange:按照routingkey分发到指定队列
    47. * TopicExchange:多关键字匹配
    48. */
    49. @Bean
    50. public DirectExchange defaultExchange() {
    51. return new DirectExchange(EXCHANGE);
    52. }
    53. @Bean
    54. public Queue queue() {
    55. return new Queue("spring-boot-queue", true);
    56. }
    57. @Bean
    58. public Binding binding() {
    59. return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
    60. }
    61. @Bean
    62. public SimpleMessageListenerContainer messageContainer() {
    63. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    64. container.setQueues(queue());
    65. container.setExposeListenerChannel(true);
    66. container.setMaxConcurrentConsumers(1);
    67. container.setConcurrentConsumers(1);
    68. // 设置确认模式手工确认
    69. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    70. container.setMessageListener(new ChannelAwareMessageListener() {
    71. @Override
    72. public void onMessage(Message message, Channel channel) throws Exception {
    73. byte[] body = message.getBody();
    74. System.out.println("receive msg : " + new String(body));
    75. //确认消息成功消费
    76. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    77. }
    78. });
    79. return container;
    80. }
    81. }
    到这里我就能完成SpringBoot整合RabbitMQ的数据收发了。
    结果:
    1. receive msg : ceshi-----?
    2. 回调id:CorrelationData [id=dfe3b3d1-f5a3-42d9-a514-a73729e009d5]
    3. 消息成功消费
    点赞收藏关注不迷路。么么哒