spring amqp是一个基于rabbitmq官方客户端amqp-client封装的操作库,详见spring-amqp,以下为基于spring boot的使用说明。
配置
以下配置为默认值,如果一致则不需要
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
rabbitTemplate
rabbitTemplate对象是spring amqp提供的操作类,其提供了包括消息发送等方法。
@Autowired
private AmqpTemplate rabbitTemplate;
消息
普通消息
队列绑定,声明一个Queue的bean,默认构造器为Queue(队列名, 持久性 = true, 排他性 = false, 自动删除 = false),可以根据需要使用不同的构造函数来初始化。
/**
* message queue声明
* message queue将会被绑定到默认exchange中
*/
@Bean
public Queue messageQueue() {
return new Queue("message");
}
消息接收
/**
* 消息接收器
* 监听message queue的消息
*/
@Component
@RabbitListener(queues = "message")
public class ObjectReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("recv: " + msg);
}
}
消息接收也可以注解在方法上
@RabbitListener(queues = "message")
public void handleQueue2(String msg) {
System.out.println("recv: " + msg);
}
消息发送
消息发送后在监听器中就可以看到打印出”recv:msg1”的字符串了。
rabbitTemplate.convertAndSend("message", "msg1");
fanout广播消息
队列绑定
/**
* 声明fanout.1 queue
*/
@Bean
public Queue queue1() {
return new Queue("fanout.1");
}
/**
* 声明fanout.2 queue
*/
@Bean
public Queue queue2() {
return new Queue("fanout.2");
}
/**
* 声明fanout.3 queue
*/
@Bean
public Queue queue3() {
return new Queue("fanout.3");
}
/**
* 声明fanouta exchange
*/
@Bean
public FanoutExchange fanoutaExchange() {
return new FanoutExchange("fanouta");
}
/**
* 绑定fanout.1到fanouta exchange
*/
@Bean
public Binding bindingExchange1() {
return BindingBuilder.bind(queue1()).to(fanoutaExchange());
}
/**
* 绑定fanout.2到fanouta exchange
*/
@Bean
public Binding bindingExchange2() {
return BindingBuilder.bind(queue2()).to(fanoutaExchange());
}
/**
* 绑定fanout.3到fanouta exchange
*/
@Bean
public Binding bindingExchange3() {
return BindingBuilder.bind(queue3()).to(fanoutaExchange());
}
消息监听
/**
* 监听fanout.1 queue消息
*/
@RabbitListener(queues = "fanout.1")
public void receiveMsg1(String msg) {
System.out.println("fanout.1: " + msg);
}
/**
* 监听fanout.2 queue消息
*/
@RabbitListener(queues = "fanout.2")
public void receiveMsg1(String msg) {
System.out.println("fanout.2: " + msg);
}
/**
* 监听fanout.3 queue消息
*/
@RabbitListener(queues = "fanout.3")
public void receiveMsg1(String msg) {
System.out.println("fanout.3: " + msg);
}
消息发送
在fanout中不需要设置routing key,所以第二个参数routing key直接设为空字符串即可,发送后每一个监听器都会收到消息”123”。
String msg = "123";
rabbitTemplate.convertAndSend("fanouta", "", msg);
topic广播消息
队列绑定
@Bean
public Queue queueA() {
return new Queue("topic.a");
}
@Bean
public Queue queueB() {
return new Queue("topic.b");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("topic.1");
}
/**
* 路由规则为routing.a
* 只能匹配到routing.a routing key的消息
*/
@Bean
public Binding bindingA(Queue queueA, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("routing.a");
}
/**
* 路由规则为#
* 可以匹配到任意routing key的消息
*/
@Bean
public Binding bindingB(Queue queueA1, TopicExchange exchange) {
return BindingBuilder.bind(queueA1).to(exchange).with("#");
}
消息监听
/**
* 监听topic.a queue消息
*/
@RabbitListener(queues = "topic.a")
public void receiveMsg1(String msg) {
System.out.println("topic.a: " + msg);
}
/**
* 监听topic.b queue消息
*/
@RabbitListener(queues = "topic.b")
public void receiveMsg1(String msg) {
System.out.println("topic.b: " + msg);
}
消息发送
发送消息后两个监听器都能接收到消息
String msg = "123";
rabbitTemplate.convertAndSend("topic.1", "routing.a", msg);
死信消息
队列绑定
/**
* 死信exchange
*/
public Exchange deadExchange() {
return ExchangeBuilder.directExchange("dead_exchange").build();
}
/**
* 死信队列
*/
@Bean
public Queue deadQueue() {
Map<String, Object> args = new HashMap<>(2);
// 死信交换机
args.put("x-dead-letter-exchange", "dead_exchange");
// 死信路由键
args.put("x-dead-letter-routing-key", "dead_key");
return QueueBuilder
.durable("test.dead_queue")
.withArguments(args)
.build();
}
/**
* 死信转发队列
*/
@Bean("redirect_queue")
public Queue redirectQueue() {
return QueueBuilder
.durable("redirect_queue")
.build();
}
/**
* 死信队列绑定死信交换机
*/
@Bean
public Binding deadBinding() {
return BindingBuilder
.bind(deadQueue())
.to(deadExchange())
.with("dead_queue_routing_key")
.noargs();
}
/**
* 死信路由绑定死信交换机
*/
@Bean
public Binding redirectBinding() {
return BindingBuilder
.bind(redirectQueue())
.to(deadExchange())
.with("dead_key")
.noargs();
}
消息监听
/**
* 监听dead_queue queue
* ackMode设置为需要手动ack
*/
@RabbitListener(queues = "dead_queue", ackMode = "MANUAL")
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = "123";
System.out.println("dead_queue msg: " + msg);
// 拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 监听消费失败的死信redirect_queue queue
*/
@RabbitListener(queues = "redirect_queue")
public void receiveDead(String msg) {
System.out.println("redirect_queue msg: " + msg);
}
消息发送
消息发送后,dead_queue会接收到消息,消息被拒绝后转发到死信队列,redirect_queue接收到消息。
rabbitTemplate.convertAndSend("dead_exchange", "dead_queue_routing_key", "123");
延时消息
推荐使用延时插件来实现延时消息,以下说明为延时插件实现。
队列绑定
/**
* 延时exchange
*/
public Exchange delayExchange() {
boolean durable = true;
boolean autoDelete = false;
Map<String, Object> args = new HashMap<>(1);
args.put("x-delayed-type", "direct");
return new CustomExchange("delaye", "x-delayed-message", durable, autoDelete, args);
}
/**
* 延时queue
*/
public Queue delayQueue() {
return new Queue("delayq");
}
/**
* 绑定exchange和queue
*/
public Binding binding(Queue delayQueue, Exchange delayExchange) {
String delayRoutingKey = "routing.delay";
return BindingBuilder
.bind(delayQueue)
.to(delayExchange)
.with(delayRoutingKey)
.noargs();
}
消息监听
@RabbitListener(queues = "delayq")
public void receiveDead(String msg) {
System.out.println("delay msg: " + msg);
}
消息发送
发送一条延时1000毫秒的消息,监听器在1000毫秒左右才接收到消息,延时不一定会非常准时,会有一定的误差,在实际应用中应该注意。
long ms = 1000;
rabbitTemplate.convertAndSend("delaye", "routing.delay", "123", msg -> {
msg.getMessageProperties().getHeaders().put("x-delay", ms);
return msg;
});