导读


这里简单列举下SpringBoot项目集成RabbitMQ消息队里的步骤。

使用


引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

添加配置文件YML

  1. spring:
  2. rabbitmq:
  3. host: 139.155.36.77
  4. port: 5672
  5. username: guest
  6. password: guest

设置MQ配置Config

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitmqConfig {
  9. /**
  10. * 创建一个立即消费队列
  11. *
  12. * @return
  13. */
  14. @Bean
  15. public Queue immediateQueue() {
  16. // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
  17. return new Queue("test_queue_demo1", true);
  18. }
  19. /**
  20. * 声明交换机名称
  21. *
  22. * @return
  23. */
  24. @Bean
  25. public DirectExchange immediateExchange() {
  26. // 一共有三种构造方法,可以只传exchange的名字,
  27. // 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
  28. //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
  29. return new DirectExchange("test_exchange_demo1", true, false);
  30. }
  31. /**
  32. * 把立即消费的队列和立即消费的exchange绑定在一起
  33. *
  34. * @return
  35. */
  36. @Bean
  37. public Binding immediateBinding() {
  38. return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with("test_routing_key_demo1");
  39. }
  40. }

设置发送者Sender

  1. import java.util.Date;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 生产者
  7. *
  8. * @author admin
  9. * @create 2020/3/2 17:00
  10. */
  11. @Component
  12. public class Sender {
  13. @Autowired
  14. RabbitTemplate rabbitTemplate;
  15. public void send() {
  16. String message = "message is: " + new Date();
  17. System.out.println("Sender:" + message);
  18. rabbitTemplate.convertAndSend("test_exchange_demo1", "test_routing_key_demo1", message);
  19. }
  20. }

设置接收者Receiver

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * 接收者
  6. *
  7. * @author admin
  8. * @create 2020/3/2 17:01
  9. */
  10. @Component
  11. public class Receiver {
  12. @RabbitHandler
  13. @RabbitListener(queues = "test_queue_demo1")
  14. public void immediateProcess(String message) {
  15. System.out.println("Receiver:" + message);
  16. }
  17. }

发送者测试

  1. import com.example.demo.mq.Sender;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Controller;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. /**
  6. * 测试消息是否发送成功
  7. *
  8. * @author admin
  9. * @create 2020/3/2 17:02
  10. */
  11. @Controller
  12. @RequestMapping("/")
  13. public class TestMqController {
  14. @Autowired
  15. private Sender sender;
  16. @RequestMapping("/send")
  17. public void setInfo() {
  18. sender.send();
  19. System.out.println("发送成功..");
  20. }
  21. }

控制台输出

image.png

接收者测试

  1. package com.example.demo.controller;
  2. import com.example.demo.mq.Receiver;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. /**
  7. * 测试消息是否发送成功
  8. *
  9. * @author admin
  10. * @create 2020/3/2 17:02
  11. */
  12. @Controller
  13. @RequestMapping("/")
  14. public class TestMqController {
  15. @Autowired
  16. private Receiver receiver;
  17. @RequestMapping("/getInfo")
  18. public void getInfo() {
  19. receiver.immediateProcess("你是谁??");
  20. }
  21. }

管理界面

对列

image.png

交换机

image.png

消息

image.png

问题


链接异常

  1. 020-03-02 17:16:50.260 INFO 11252 --- [tContainer#0-32] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [139.155.36.77:15672]
  2. 2020-03-02 17:16:55.305 ERROR 11252 --- [ip:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
  3. java.net.SocketException: Socket Closed
  4. at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_162]
  5. at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_162]
  6. at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_162]
  7. at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_162]
  8. at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_162]
  9. at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_162]
  10. at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_162]
  11. at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.7.3.jar:5.7.3]
  12. at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184) ~[amqp-client-5.7.3.jar:5.7.3]
  13. at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598) ~[amqp-client-5.7.3.jar:5.7.3]
  14. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]

image.png

原因:
配置文件设置的端口号是 15672配置中映射2个端口:15672是Web管理界面的端口;5672是MQ访问的端口,所以这里改成 5672 成功解决。