导读
这里简单列举下SpringBoot项目集成RabbitMQ消息队里的步骤。
使用
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加配置文件YML
spring:rabbitmq:host: 139.155.36.77port: 5672username: guestpassword: guest
设置MQ配置Config
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitmqConfig {/*** 创建一个立即消费队列** @return*/@Beanpublic Queue immediateQueue() {// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化return new Queue("test_queue_demo1", true);}/*** 声明交换机名称** @return*/@Beanpublic DirectExchange immediateExchange() {// 一共有三种构造方法,可以只传exchange的名字,// 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数return new DirectExchange("test_exchange_demo1", true, false);}/*** 把立即消费的队列和立即消费的exchange绑定在一起** @return*/@Beanpublic Binding immediateBinding() {return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with("test_routing_key_demo1");}}
设置发送者Sender
import java.util.Date;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** 生产者** @author admin* @create 2020/3/2 17:00*/@Componentpublic class Sender {@AutowiredRabbitTemplate rabbitTemplate;public void send() {String message = "message is: " + new Date();System.out.println("Sender:" + message);rabbitTemplate.convertAndSend("test_exchange_demo1", "test_routing_key_demo1", message);}}
设置接收者Receiver
import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 接收者** @author admin* @create 2020/3/2 17:01*/@Componentpublic class Receiver {@RabbitHandler@RabbitListener(queues = "test_queue_demo1")public void immediateProcess(String message) {System.out.println("Receiver:" + message);}}
发送者测试
import com.example.demo.mq.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;/*** 测试消息是否发送成功** @author admin* @create 2020/3/2 17:02*/@Controller@RequestMapping("/")public class TestMqController {@Autowiredprivate Sender sender;@RequestMapping("/send")public void setInfo() {sender.send();System.out.println("发送成功..");}}
控制台输出

接收者测试
package com.example.demo.controller;import com.example.demo.mq.Receiver;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;/*** 测试消息是否发送成功** @author admin* @create 2020/3/2 17:02*/@Controller@RequestMapping("/")public class TestMqController {@Autowiredprivate Receiver receiver;@RequestMapping("/getInfo")public void getInfo() {receiver.immediateProcess("你是谁??");}}
管理界面
对列

交换机

消息

问题
链接异常
020-03-02 17:16:50.260 INFO 11252 --- [tContainer#0-32] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [139.155.36.77:15672]2020-03-02 17:16:55.305 ERROR 11252 --- [ip:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occuredjava.net.SocketException: Socket Closedat java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_162]at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_162]at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_162]at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_162]at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_162]at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_162]at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_162]at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.7.3.jar:5.7.3]at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184) ~[amqp-client-5.7.3.jar:5.7.3]at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598) ~[amqp-client-5.7.3.jar:5.7.3]at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]

原因:
配置文件设置的端口号是 15672配置中映射2个端口:15672是Web管理界面的端口;5672是MQ访问的端口,所以这里改成 5672 成功解决。
