导读
这里简单列举下SpringBoot项目集成RabbitMQ消息队里的步骤。
使用
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置文件YML
spring:
rabbitmq:
host: 139.155.36.77
port: 5672
username: guest
password: guest
设置MQ配置Config
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
/**
* 创建一个立即消费队列
*
* @return
*/
@Bean
public Queue immediateQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue("test_queue_demo1", true);
}
/**
* 声明交换机名称
*
* @return
*/
@Bean
public DirectExchange immediateExchange() {
// 一共有三种构造方法,可以只传exchange的名字,
// 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange("test_exchange_demo1", true, false);
}
/**
* 把立即消费的队列和立即消费的exchange绑定在一起
*
* @return
*/
@Bean
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with("test_routing_key_demo1");
}
}
设置发送者Sender
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者
*
* @author admin
* @create 2020/3/2 17:00
*/
@Component
public class Sender {
@Autowired
RabbitTemplate rabbitTemplate;
public void send() {
String message = "message is: " + new Date();
System.out.println("Sender:" + message);
rabbitTemplate.convertAndSend("test_exchange_demo1", "test_routing_key_demo1", message);
}
}
设置接收者Receiver
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 接收者
*
* @author admin
* @create 2020/3/2 17:01
*/
@Component
public class Receiver {
@RabbitHandler
@RabbitListener(queues = "test_queue_demo1")
public void immediateProcess(String message) {
System.out.println("Receiver:" + message);
}
}
发送者测试
import com.example.demo.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* 测试消息是否发送成功
*
* @author admin
* @create 2020/3/2 17:02
*/
@Controller
@RequestMapping("/")
public class TestMqController {
@Autowired
private Sender sender;
@RequestMapping("/send")
public void setInfo() {
sender.send();
System.out.println("发送成功..");
}
}
控制台输出
接收者测试
package com.example.demo.controller;
import com.example.demo.mq.Receiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* 测试消息是否发送成功
*
* @author admin
* @create 2020/3/2 17:02
*/
@Controller
@RequestMapping("/")
public class TestMqController {
@Autowired
private Receiver receiver;
@RequestMapping("/getInfo")
public void getInfo() {
receiver.immediateProcess("你是谁??");
}
}
管理界面
对列
交换机
消息
问题
链接异常
020-03-02 17:16:50.260 INFO 11252 --- [tContainer#0-32] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [139.155.36.77:15672]
2020-03-02 17:16:55.305 ERROR 11252 --- [ip:15672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_162]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_162]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_162]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_162]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_162]
at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_162]
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_162]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598) ~[amqp-client-5.7.3.jar:5.7.3]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
原因:
配置文件设置的端口号是 15672
配置中映射2个端口:15672是Web管理界面的端口;5672是MQ访问的端口,所以这里改成 5672
成功解决。