spring amqp是一个基于rabbitmq官方客户端amqp-client封装的操作库,详见spring-amqp,以下为基于spring boot的使用说明。

配置

以下配置为默认值,如果一致则不需要

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. virtual-host: /
  6. username: guest
  7. password: guest

rabbitTemplate

rabbitTemplate对象是spring amqp提供的操作类,其提供了包括消息发送等方法。

  1. @Autowired
  2. private AmqpTemplate rabbitTemplate;

消息

普通消息

队列绑定,声明一个Queue的bean,默认构造器为Queue(队列名, 持久性 = true, 排他性 = false, 自动删除 = false),可以根据需要使用不同的构造函数来初始化。

  1. /**
  2. * message queue声明
  3. * message queue将会被绑定到默认exchange中
  4. */
  5. @Bean
  6. public Queue messageQueue() {
  7. return new Queue("message");
  8. }

消息接收

  1. /**
  2. * 消息接收器
  3. * 监听message queue的消息
  4. */
  5. @Component
  6. @RabbitListener(queues = "message")
  7. public class ObjectReceiver {
  8. @RabbitHandler
  9. public void process(String msg) {
  10. System.out.println("recv: " + msg);
  11. }
  12. }

消息接收也可以注解在方法上

  1. @RabbitListener(queues = "message")
  2. public void handleQueue2(String msg) {
  3. System.out.println("recv: " + msg);
  4. }

消息发送
消息发送后在监听器中就可以看到打印出”recv:msg1”的字符串了。

  1. rabbitTemplate.convertAndSend("message", "msg1");

fanout广播消息

队列绑定

  1. /**
  2. * 声明fanout.1 queue
  3. */
  4. @Bean
  5. public Queue queue1() {
  6. return new Queue("fanout.1");
  7. }
  8. /**
  9. * 声明fanout.2 queue
  10. */
  11. @Bean
  12. public Queue queue2() {
  13. return new Queue("fanout.2");
  14. }
  15. /**
  16. * 声明fanout.3 queue
  17. */
  18. @Bean
  19. public Queue queue3() {
  20. return new Queue("fanout.3");
  21. }
  22. /**
  23. * 声明fanouta exchange
  24. */
  25. @Bean
  26. public FanoutExchange fanoutaExchange() {
  27. return new FanoutExchange("fanouta");
  28. }
  29. /**
  30. * 绑定fanout.1到fanouta exchange
  31. */
  32. @Bean
  33. public Binding bindingExchange1() {
  34. return BindingBuilder.bind(queue1()).to(fanoutaExchange());
  35. }
  36. /**
  37. * 绑定fanout.2到fanouta exchange
  38. */
  39. @Bean
  40. public Binding bindingExchange2() {
  41. return BindingBuilder.bind(queue2()).to(fanoutaExchange());
  42. }
  43. /**
  44. * 绑定fanout.3到fanouta exchange
  45. */
  46. @Bean
  47. public Binding bindingExchange3() {
  48. return BindingBuilder.bind(queue3()).to(fanoutaExchange());
  49. }

消息监听

  1. /**
  2. * 监听fanout.1 queue消息
  3. */
  4. @RabbitListener(queues = "fanout.1")
  5. public void receiveMsg1(String msg) {
  6. System.out.println("fanout.1: " + msg);
  7. }
  8. /**
  9. * 监听fanout.2 queue消息
  10. */
  11. @RabbitListener(queues = "fanout.2")
  12. public void receiveMsg1(String msg) {
  13. System.out.println("fanout.2: " + msg);
  14. }
  15. /**
  16. * 监听fanout.3 queue消息
  17. */
  18. @RabbitListener(queues = "fanout.3")
  19. public void receiveMsg1(String msg) {
  20. System.out.println("fanout.3: " + msg);
  21. }

消息发送
在fanout中不需要设置routing key,所以第二个参数routing key直接设为空字符串即可,发送后每一个监听器都会收到消息”123”。

  1. String msg = "123";
  2. rabbitTemplate.convertAndSend("fanouta", "", msg);

消息队列:rabbitmq spring amqp - 图1

topic广播消息

队列绑定

  1. @Bean
  2. public Queue queueA() {
  3. return new Queue("topic.a");
  4. }
  5. @Bean
  6. public Queue queueB() {
  7. return new Queue("topic.b");
  8. }
  9. @Bean
  10. public TopicExchange exchange() {
  11. return new TopicExchange("topic.1");
  12. }
  13. /**
  14. * 路由规则为routing.a
  15. * 只能匹配到routing.a routing key的消息
  16. */
  17. @Bean
  18. public Binding bindingA(Queue queueA, TopicExchange exchange) {
  19. return BindingBuilder.bind(queueMessages).to(exchange).with("routing.a");
  20. }
  21. /**
  22. * 路由规则为#
  23. * 可以匹配到任意routing key的消息
  24. */
  25. @Bean
  26. public Binding bindingB(Queue queueA1, TopicExchange exchange) {
  27. return BindingBuilder.bind(queueA1).to(exchange).with("#");
  28. }

消息监听

  1. /**
  2. * 监听topic.a queue消息
  3. */
  4. @RabbitListener(queues = "topic.a")
  5. public void receiveMsg1(String msg) {
  6. System.out.println("topic.a: " + msg);
  7. }
  8. /**
  9. * 监听topic.b queue消息
  10. */
  11. @RabbitListener(queues = "topic.b")
  12. public void receiveMsg1(String msg) {
  13. System.out.println("topic.b: " + msg);
  14. }

消息发送
发送消息后两个监听器都能接收到消息

  1. String msg = "123";
  2. rabbitTemplate.convertAndSend("topic.1", "routing.a", msg);

消息队列:rabbitmq spring amqp - 图2

死信消息

队列绑定

  1. /**
  2. * 死信exchange
  3. */
  4. public Exchange deadExchange() {
  5. return ExchangeBuilder.directExchange("dead_exchange").build();
  6. }
  7. /**
  8. * 死信队列
  9. */
  10. @Bean
  11. public Queue deadQueue() {
  12. Map<String, Object> args = new HashMap<>(2);
  13. // 死信交换机
  14. args.put("x-dead-letter-exchange", "dead_exchange");
  15. // 死信路由键
  16. args.put("x-dead-letter-routing-key", "dead_key");
  17. return QueueBuilder
  18. .durable("test.dead_queue")
  19. .withArguments(args)
  20. .build();
  21. }
  22. /**
  23. * 死信转发队列
  24. */
  25. @Bean("redirect_queue")
  26. public Queue redirectQueue() {
  27. return QueueBuilder
  28. .durable("redirect_queue")
  29. .build();
  30. }
  31. /**
  32. * 死信队列绑定死信交换机
  33. */
  34. @Bean
  35. public Binding deadBinding() {
  36. return BindingBuilder
  37. .bind(deadQueue())
  38. .to(deadExchange())
  39. .with("dead_queue_routing_key")
  40. .noargs();
  41. }
  42. /**
  43. * 死信路由绑定死信交换机
  44. */
  45. @Bean
  46. public Binding redirectBinding() {
  47. return BindingBuilder
  48. .bind(redirectQueue())
  49. .to(deadExchange())
  50. .with("dead_key")
  51. .noargs();
  52. }

消息监听

  1. /**
  2. * 监听dead_queue queue
  3. * ackMode设置为需要手动ack
  4. */
  5. @RabbitListener(queues = "dead_queue", ackMode = "MANUAL")
  6. public void receiveMsg(Message message, Channel channel) throws IOException {
  7. String msg = "123";
  8. System.out.println("dead_queue msg: " + msg);
  9. // 拒绝
  10. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  11. }
  12. /**
  13. * 监听消费失败的死信redirect_queue queue
  14. */
  15. @RabbitListener(queues = "redirect_queue")
  16. public void receiveDead(String msg) {
  17. System.out.println("redirect_queue msg: " + msg);
  18. }

消息发送
消息发送后,dead_queue会接收到消息,消息被拒绝后转发到死信队列,redirect_queue接收到消息。

  1. rabbitTemplate.convertAndSend("dead_exchange", "dead_queue_routing_key", "123");

延时消息

推荐使用延时插件来实现延时消息,以下说明为延时插件实现。

队列绑定

  1. /**
  2. * 延时exchange
  3. */
  4. public Exchange delayExchange() {
  5. boolean durable = true;
  6. boolean autoDelete = false;
  7. Map<String, Object> args = new HashMap<>(1);
  8. args.put("x-delayed-type", "direct");
  9. return new CustomExchange("delaye", "x-delayed-message", durable, autoDelete, args);
  10. }
  11. /**
  12. * 延时queue
  13. */
  14. public Queue delayQueue() {
  15. return new Queue("delayq");
  16. }
  17. /**
  18. * 绑定exchange和queue
  19. */
  20. public Binding binding(Queue delayQueue, Exchange delayExchange) {
  21. String delayRoutingKey = "routing.delay";
  22. return BindingBuilder
  23. .bind(delayQueue)
  24. .to(delayExchange)
  25. .with(delayRoutingKey)
  26. .noargs();
  27. }

消息监听

  1. @RabbitListener(queues = "delayq")
  2. public void receiveDead(String msg) {
  3. System.out.println("delay msg: " + msg);
  4. }

消息发送
发送一条延时1000毫秒的消息,监听器在1000毫秒左右才接收到消息,延时不一定会非常准时,会有一定的误差,在实际应用中应该注意。

  1. long ms = 1000;
  2. rabbitTemplate.convertAndSend("delaye", "routing.delay", "123", msg -> {
  3. msg.getMessageProperties().getHeaders().put("x-delay", ms);
  4. return msg;
  5. });