RabbitMQ SpringBoot
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。
实现高性能,高可用,可伸缩和最终一致性架构。「RabbitMQ」是实现了高级消息队列协议(AMQP)的开源消息,具有较高的系统吞吐量、可靠性、消息持久化、免费等优点,在软件项目中具有非常广泛的应用。

项目代码实现

依赖配置

Maven依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.1.RELEASE</version>
  5. <relativePath/>
  6. </parent>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>

配置文件

  1. spring.rabbitmq.host=192.168.202.128
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest

组件设计与实现

Exchange(交换机)

定义交换机名称、类型、持久化、延时交换机名称等属性。

  1. public interface IRabbitMqExchange {
  2. /**
  3. * Exchange(交换机) 的名称
  4. * */
  5. String exchangeName();
  6. /**
  7. * exchange类型 DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")
  8. * */
  9. default String type(){return "topic";}
  10. /**
  11. * 是否持久化
  12. */
  13. default boolean durable(){return true;}
  14. /**
  15. * 当所有队列在完成使用此exchange时,是否删除
  16. */
  17. default boolean autoDelete(){return false;}
  18. /**
  19. * 是否允许直接binding
  20. * 如果是true的话 则不允许直接binding到此 exchange
  21. */
  22. default boolean internal(){ return false;}
  23. /**
  24. * 其他的一些参数设置
  25. */
  26. default Map<String, Object> arguments(){ return null; }
  27. /**
  28. * 延时 Exchange
  29. * */
  30. default String delayExchangeName() {return "delay."+exchangeName();}
  31. }

路由(Routing)

  1. public interface IRabbitMqRouting {
  2. /**
  3. * rabbitmq路由key
  4. * */
  5. String routingKey();
  6. }

队列(Queue)

定义队列名称、持久化、延时队列名称等属性

  1. public interface IRabbitMqQueue {
  2. /**
  3. * Queue(队列)名称
  4. */
  5. String queueName();
  6. /**
  7. * 是否持久化
  8. * */
  9. default boolean durable() {return true;}
  10. /**
  11. * 排他性
  12. * */
  13. default boolean exclusive(){return false;}
  14. /**
  15. * 是否自动删除
  16. * */
  17. default boolean autoDelete(){return false;}
  18. /**
  19. * 其他属性设置
  20. * */
  21. default Map<String, Object> arguments() { return null; }
  22. /**
  23. * 默认的延时队列名称
  24. * */
  25. default String delayQueueName(){return "delay."+this.queueName();}
  26. }

绑定关系(Binding)

定义了 交换机(Exchange)-路由(Routing)-消息队列(Queue)的绑定关系,以及定义是否支持延时消息。

  1. public interface IRabbitMqBinding {
  2. /**
  3. * 需要绑定的exchange(交换机)
  4. * */
  5. IRabbitMqExchange exchange();
  6. /**
  7. * 需要绑定的routing(路由)
  8. * */
  9. IRabbitMqRouting routing();
  10. /**
  11. * 需要绑定的queue(队列)
  12. * */
  13. IRabbitMqQueue queue();
  14. /**
  15. * 消息队列是否允许延时
  16. * */
  17. boolean allowDelay();
  18. }

默认注册器

实现了交换机、消息队列、绑定关系的注册。如果绑定关系中定义支持延迟消息,则额外注册一个延时交换机和死信队列,以实现延时消息推送的功能。

  1. public class DefaultRabbitMqRegister implements IRabbitMqRegister, SmartLifecycle {
  2. ConnectionFactory connectionFactory;
  3. Channel channel;
  4. public DefaultRabbitMqRegister() {
  5. }
  6. public DefaultRabbitMqRegister(ConnectionFactory connectionFactory) {
  7. this.connectionFactory = connectionFactory;
  8. }
  9. @PostConstruct
  10. public void init() {
  11. channel = connectionFactory.createConnection().createChannel(false);
  12. }
  13. @Override
  14. public void registerExchange(IRabbitMqExchange... exchanges) throws IOException {
  15. for (IRabbitMqExchange exchange : exchanges) {
  16. channel.exchangeDeclare(exchange.exchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());
  17. }
  18. }
  19. @Override
  20. public void registerQueue(IRabbitMqQueue... queues) throws IOException {
  21. for (IRabbitMqQueue queue : queues) {
  22. channel.queueDeclare(queue.queueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), queue.arguments());
  23. }
  24. }
  25. @Override
  26. public void registerBinding(IRabbitMqBinding... bindings) throws IOException {
  27. for (IRabbitMqBinding binding : bindings) {
  28. channel.queueBind(binding.queue().queueName(), binding.exchange().exchangeName(), binding.routing().routingKey());
  29. if (binding.allowDelay()) {
  30. registerDelayBinding(binding);
  31. }
  32. }
  33. }
  34. /**
  35. * 创建一个内部的 死信队列 用来实现 延时队列
  36. */
  37. private void registerDelayBinding(IRabbitMqBinding binding) throws IOException {
  38. IRabbitMqExchange exchange = binding.exchange();
  39. // 注册一个延时的消息交换机
  40. channel.exchangeDeclare(exchange.delayExchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());
  41. // 注册一个死信队列 设置消息超时后,将消息转发到原来的Router队列
  42. IRabbitMqQueue queue = binding.queue();
  43. Map<String, Object> arguments = queue.arguments();
  44. if (arguments == null) {
  45. arguments = new HashMap<>(4);
  46. }
  47. arguments.put("x-dead-letter-exchange", binding.exchange().exchangeName());
  48. arguments.put("x-dead-letter-routing-key", binding.routing().routingKey());
  49. channel.queueDeclare(queue.delayQueueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), arguments);
  50. // 将交换机和队列绑定
  51. channel.queueBind(queue.delayQueueName(), exchange.delayExchangeName(), binding.routing().routingKey());
  52. }
  53. private List<MessageListenerContainer> listenerContainers = new LinkedList<>();
  54. @Override
  55. public void listenerQueue(IRabbitMqListener listener, IRabbitMqQueue... queues) {
  56. String[] queueNames = new String[queues.length];
  57. for (int idx = 0; idx < queues.length; idx++) {
  58. queueNames[idx] = queues[idx].queueName();
  59. }
  60. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  61. // 配置手动确认
  62. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  63. container.setQueueNames(queueNames);
  64. container.setMessageListener(listener);
  65. listenerContainers.add(container);
  66. }
  67. @Override
  68. public void start() {
  69. for (MessageListenerContainer container : listenerContainers) {
  70. container.start();
  71. }
  72. }
  73. @Override
  74. public void stop() {
  75. }
  76. @Override
  77. public boolean isRunning() {
  78. return false;
  79. }
  80. @Override
  81. public boolean isAutoStartup() {
  82. return true;
  83. }
  84. @Override
  85. public void stop(Runnable runnable) {
  86. }
  87. @Override
  88. public int getPhase() {
  89. return 9999;
  90. }
  91. }

消息监听器

  1. public interface IRabbitMqListener {
  2. /**
  3. * 处理rabbitMq的消息
  4. * */
  5. boolean handleMessage(Object obj);
  6. }

抽象实现类(具体的消费者继承该抽象类,重写handleMessage()方法,实现消费逻辑)

  1. public abstract class AbstractMessageListener implements ChannelAwareMessageListener, IRabbitMqListener {
  2. private Logger logger = LoggerFactory.getLogger(AbstractMessageListener.class);
  3. private MessageConverter messageConverter = new Jackson2JsonMessageConverter();
  4. @Override
  5. public void onMessage(Message message, Channel channel) throws Exception {
  6. long tag = message.getMessageProperties().getDeliveryTag();
  7. try {
  8. Object obj = messageConverter.fromMessage(message);
  9. boolean handleResult = handleMessage(obj);
  10. if (handleResult) {
  11. channel.basicAck(tag, false);
  12. } else {
  13. logger.error("消息处理失败 message: {}", message);
  14. channel.basicNack(tag, false, false);
  15. }
  16. } catch (Exception e) {
  17. channel.basicNack(tag, false, false);
  18. logger.error("消息处理异常 message: " + message + " " + e.getMessage(), e);
  19. }
  20. }
  21. }

消息发送服务类

实现发送消息、发送延时消息等功能

  1. public class RabbitMqServiceImpl implements IRabbitMqService, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  2. private Logger logger = LoggerFactory.getLogger(RabbitMqServiceImpl.class);
  3. @Autowired
  4. protected RabbitTemplate rabbitTemplate;
  5. @PostConstruct
  6. public void init() {
  7. rabbitTemplate.setConfirmCallback(this);
  8. rabbitTemplate.setReturnCallback(this);
  9. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  10. }
  11. @Override
  12. public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg) {
  13. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  14. rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);
  15. }
  16. @Override
  17. public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg, long delay) {
  18. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  19. if (delay > 0) {
  20. MessagePostProcessor processor = (Message message) -> {
  21. message.getMessageProperties().setExpiration(delay + "");
  22. return message;
  23. };
  24. rabbitTemplate.convertAndSend(exchange.delayExchangeName(), routing.routingKey(), msg, processor, correlationId);
  25. } else {
  26. rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);
  27. }
  28. }
  29. /**
  30. * 消息发送的回调
  31. *
  32. * @param correlationId 消息Id
  33. * @param ack 是否成功的标示
  34. * @param cause 错误原因
  35. */
  36. @Override
  37. public void confirm(CorrelationData correlationId, boolean ack, String cause) {
  38. if (ack) {
  39. logger.info("消息发送成功 correlationId: {} cause: {}", correlationId, cause);
  40. } else {
  41. logger.error("消息发送失败 correlationId: {} cause: {}", correlationId, cause);
  42. }
  43. }
  44. @Override
  45. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  46. logger.info("returnedMessage message: {} replyCode: {} exchange: {} routingKey: {}", message, replyCode, exchange, routingKey);
  47. }
  48. }

实战

使用枚举定义消息队列配置

定义测试Exchange:mq.exchange.test

  1. /**
  2. * RabbitMq Exchange(交换机)定义
  3. * */
  4. public enum RabbitMqExchange implements IRabbitMqExchange {
  5. MQ_EXCHANGE_TEST("mq.exchange.test") ;
  6. private String exchangeName;
  7. @Override
  8. public String exchangeName() {
  9. return this.exchangeName;
  10. }
  11. RabbitMqExchange(String exchangeName){
  12. this.exchangeName = exchangeName;
  13. }
  14. }

定义测试Queue:mq.queue.test

  1. public enum RabbitMqQueue implements IRabbitMqQueue {
  2. MQ_QUEUE_TEST("mq.queue.test");
  3. private String queueName;
  4. @Override
  5. public String queueName() {
  6. return this.queueName;
  7. }
  8. RabbitMqQueue(String queueName){
  9. this.queueName = queueName;
  10. }
  11. }

定义测试Routing:mq.routing.test

  1. /**
  2. * RabbitMq routing(路由定义)
  3. * */
  4. public enum RabbitMqRouting implements IRabbitMqRouting {
  5. MQ_ROUTING_TEST("mq.routing.test");
  6. private String routingKey;
  7. @Override
  8. public String routingKey() {
  9. return this.routingKey;
  10. }
  11. RabbitMqRouting(String routingKey){
  12. this.routingKey = routingKey;
  13. }
  14. }

定义绑定关系:

  1. /**
  2. * RabbitMq Exchange(交换机) Routing(路由) Queue(队列) 的绑定关系
  3. * */
  4. public enum RabbitMqBinding implements IRabbitMqBinding {
  5. MQ_BINDING_TEST(RabbitMqExchange.MQ_EXCHANGE_TEST,RabbitMqRouting.MQ_ROUTING_TEST,RabbitMqQueue.MQ_QUEUE_TEST,true);
  6. /**
  7. * exchange(交换机)
  8. */
  9. IRabbitMqExchange exchange;
  10. /**
  11. * routing(路由)
  12. */
  13. IRabbitMqRouting routing;
  14. /**
  15. * queue(队列)
  16. */
  17. IRabbitMqQueue queue;
  18. /**
  19. * 是否允许延时
  20. */
  21. boolean allowDelay = false;
  22. RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue){
  23. this.exchange = exchange;
  24. this.routing = routing;
  25. this.queue = queue;
  26. }
  27. RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue,boolean allowDelay){
  28. this.exchange = exchange;
  29. this.routing = routing;
  30. this.queue = queue;
  31. this.allowDelay = allowDelay;
  32. }
  33. @Override
  34. public IRabbitMqExchange exchange() {
  35. return this.exchange;
  36. }
  37. @Override
  38. public IRabbitMqRouting routing() {
  39. return this.routing;
  40. }
  41. @Override
  42. public IRabbitMqQueue queue() {
  43. return this.queue;
  44. }
  45. @Override
  46. public boolean allowDelay() {
  47. return this.allowDelay;
  48. }
  49. }

测试消费者类

  1. public class TestConsumer extends AbstractMessageListener {
  2. Logger logger = LoggerFactory.getLogger(TestConsumer.class);
  3. @Override
  4. public boolean handleMessage(Object obj) {
  5. logger.info("rabbitmq消费者开始消费,消息内容:" +obj.toString());
  6. return true;
  7. }
  8. }

启动项目
登录rabbitmq控制台,已经自动创建了 交换机和延迟交换机,消息队列和死信队列
测试发送消息

  1. @Test
  2. public void testSendMq(){
  3. logger.info("生产者发送消息到mq");
  4. rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"测试发送消息");
  5. }

测试发送延时消息(60秒)

  1. @Test
  2. public void testSendDelayMq(){
  3. logger.info("生产者发送延迟消息到mq");
  4. rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"测试发送延时消息60s",60*1000);
  5. }