spring amqp是一个基于rabbitmq官方客户端amqp-client封装的操作库,详见spring-amqp,以下为基于spring boot的使用说明。
配置
以下配置为默认值,如果一致则不需要
spring:rabbitmq:host: localhostport: 5672virtual-host: /username: guestpassword: guest
rabbitTemplate
rabbitTemplate对象是spring amqp提供的操作类,其提供了包括消息发送等方法。
@Autowiredprivate AmqpTemplate rabbitTemplate;
消息
普通消息
队列绑定,声明一个Queue的bean,默认构造器为Queue(队列名, 持久性 = true, 排他性 = false, 自动删除 = false),可以根据需要使用不同的构造函数来初始化。
/*** message queue声明* message queue将会被绑定到默认exchange中*/@Beanpublic Queue messageQueue() {return new Queue("message");}
消息接收
/*** 消息接收器* 监听message queue的消息*/@Component@RabbitListener(queues = "message")public class ObjectReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("recv: " + msg);}}
消息接收也可以注解在方法上
@RabbitListener(queues = "message")public void handleQueue2(String msg) {System.out.println("recv: " + msg);}
消息发送
消息发送后在监听器中就可以看到打印出”recv:msg1”的字符串了。
rabbitTemplate.convertAndSend("message", "msg1");
fanout广播消息
队列绑定
/*** 声明fanout.1 queue*/@Beanpublic Queue queue1() {return new Queue("fanout.1");}/*** 声明fanout.2 queue*/@Beanpublic Queue queue2() {return new Queue("fanout.2");}/*** 声明fanout.3 queue*/@Beanpublic Queue queue3() {return new Queue("fanout.3");}/*** 声明fanouta exchange*/@Beanpublic FanoutExchange fanoutaExchange() {return new FanoutExchange("fanouta");}/*** 绑定fanout.1到fanouta exchange*/@Beanpublic Binding bindingExchange1() {return BindingBuilder.bind(queue1()).to(fanoutaExchange());}/*** 绑定fanout.2到fanouta exchange*/@Beanpublic Binding bindingExchange2() {return BindingBuilder.bind(queue2()).to(fanoutaExchange());}/*** 绑定fanout.3到fanouta exchange*/@Beanpublic Binding bindingExchange3() {return BindingBuilder.bind(queue3()).to(fanoutaExchange());}
消息监听
/*** 监听fanout.1 queue消息*/@RabbitListener(queues = "fanout.1")public void receiveMsg1(String msg) {System.out.println("fanout.1: " + msg);}/*** 监听fanout.2 queue消息*/@RabbitListener(queues = "fanout.2")public void receiveMsg1(String msg) {System.out.println("fanout.2: " + msg);}/*** 监听fanout.3 queue消息*/@RabbitListener(queues = "fanout.3")public void receiveMsg1(String msg) {System.out.println("fanout.3: " + msg);}
消息发送
在fanout中不需要设置routing key,所以第二个参数routing key直接设为空字符串即可,发送后每一个监听器都会收到消息”123”。
String msg = "123";rabbitTemplate.convertAndSend("fanouta", "", msg);
topic广播消息
队列绑定
@Beanpublic Queue queueA() {return new Queue("topic.a");}@Beanpublic Queue queueB() {return new Queue("topic.b");}@Beanpublic TopicExchange exchange() {return new TopicExchange("topic.1");}/*** 路由规则为routing.a* 只能匹配到routing.a routing key的消息*/@Beanpublic Binding bindingA(Queue queueA, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("routing.a");}/*** 路由规则为#* 可以匹配到任意routing key的消息*/@Beanpublic Binding bindingB(Queue queueA1, TopicExchange exchange) {return BindingBuilder.bind(queueA1).to(exchange).with("#");}
消息监听
/*** 监听topic.a queue消息*/@RabbitListener(queues = "topic.a")public void receiveMsg1(String msg) {System.out.println("topic.a: " + msg);}/*** 监听topic.b queue消息*/@RabbitListener(queues = "topic.b")public void receiveMsg1(String msg) {System.out.println("topic.b: " + msg);}
消息发送
发送消息后两个监听器都能接收到消息
String msg = "123";rabbitTemplate.convertAndSend("topic.1", "routing.a", msg);
死信消息
队列绑定
/*** 死信exchange*/public Exchange deadExchange() {return ExchangeBuilder.directExchange("dead_exchange").build();}/*** 死信队列*/@Beanpublic Queue deadQueue() {Map<String, Object> args = new HashMap<>(2);// 死信交换机args.put("x-dead-letter-exchange", "dead_exchange");// 死信路由键args.put("x-dead-letter-routing-key", "dead_key");return QueueBuilder.durable("test.dead_queue").withArguments(args).build();}/*** 死信转发队列*/@Bean("redirect_queue")public Queue redirectQueue() {return QueueBuilder.durable("redirect_queue").build();}/*** 死信队列绑定死信交换机*/@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_queue_routing_key").noargs();}/*** 死信路由绑定死信交换机*/@Beanpublic Binding redirectBinding() {return BindingBuilder.bind(redirectQueue()).to(deadExchange()).with("dead_key").noargs();}
消息监听
/*** 监听dead_queue queue* ackMode设置为需要手动ack*/@RabbitListener(queues = "dead_queue", ackMode = "MANUAL")public void receiveMsg(Message message, Channel channel) throws IOException {String msg = "123";System.out.println("dead_queue msg: " + msg);// 拒绝channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}/*** 监听消费失败的死信redirect_queue queue*/@RabbitListener(queues = "redirect_queue")public void receiveDead(String msg) {System.out.println("redirect_queue msg: " + msg);}
消息发送
消息发送后,dead_queue会接收到消息,消息被拒绝后转发到死信队列,redirect_queue接收到消息。
rabbitTemplate.convertAndSend("dead_exchange", "dead_queue_routing_key", "123");
延时消息
推荐使用延时插件来实现延时消息,以下说明为延时插件实现。
队列绑定
/*** 延时exchange*/public Exchange delayExchange() {boolean durable = true;boolean autoDelete = false;Map<String, Object> args = new HashMap<>(1);args.put("x-delayed-type", "direct");return new CustomExchange("delaye", "x-delayed-message", durable, autoDelete, args);}/*** 延时queue*/public Queue delayQueue() {return new Queue("delayq");}/*** 绑定exchange和queue*/public Binding binding(Queue delayQueue, Exchange delayExchange) {String delayRoutingKey = "routing.delay";return BindingBuilder.bind(delayQueue).to(delayExchange).with(delayRoutingKey).noargs();}
消息监听
@RabbitListener(queues = "delayq")public void receiveDead(String msg) {System.out.println("delay msg: " + msg);}
消息发送
发送一条延时1000毫秒的消息,监听器在1000毫秒左右才接收到消息,延时不一定会非常准时,会有一定的误差,在实际应用中应该注意。
long ms = 1000;rabbitTemplate.convertAndSend("delaye", "routing.delay", "123", msg -> {msg.getMessageProperties().getHeaders().put("x-delay", ms);return msg;});
