简单模式
pom引入
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies>
application.yml
server:port: 8080spring:rabbitmq:host: 192.168.135.141port: 5672virtual-host: /username: guestpassword: guest# 消息发送失败回调# publisher-returns: true# # 消息发送确认到交互机,选择交互模式# publisher-confirm-type: correlatedlistener:simple:# 消息传递(ack是否手动确认,none:不确认,auto:自动确认,manual:手动确认)acknowledge-mode: manual# # 接收的消费队列数(并发)# concurrency: 5# # 最大并发数# max-concurrency: 10
代码讲解
第一部分
提供者
ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");
springboot使用的是配置文件,读取application.yml文件
代码也不需要程序员编写。
第二部分
/*name : 队列名称durable : 持久化exclusive : 是否自动连接autoDelete : 是否自动清除空闲队列arguments : 队列的其他配置*/Channel channel = channel.queueDeclare(QUEUE_NAME, true, false, false, null);
和下面的代码一样
@Bean("simple_queue")public Queue createCommonQueue() {/*name : 队列名称durable : 持久化exclusive : 是否自动连接autoDelete : 是否自动清除空闲队列arguments : 队列的其他配置*/return new Queue(SIMPLE_QUEUE, true, false, false, null);}
第三部分
String message = "你好,rabbitmq!";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
和下面的代码一样
rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE,”simple模式,发送了消息”);
第四部分
消费者
ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");
springboot使用的是配置文件,读取application.yml文件
代码也不需要程序员编写。
第五部分
DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME, true, consumer);
和下面的代码一样,注意,这里测试后发生格式编码错误org.springframework.amqp.AmqpException: No method found for class ,懒得弄,就用最简单的方式解决。
@Component@RabbitListener(queues = "simple_queue")public class SimpleListener {@RabbitHandlerpublic void receive(Message message, Channel channel) {String messageRec = new String(message.getBody());System.out.println("simple模式接收到了消息:"+messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------"+e.getMessage());}}}
以上就是大致的代码逻辑,后续安装我总结的图片,就可以清晰的编写其他模式的代码。
简单模式代码实现
配置类
package com.hikktn.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMQConfig* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:49* @Version 1.0*/@Configurationpublic class RabbitMQConfig {// 普通模式public static final String SIMPLE_QUEUE = "simple_queue";// 简单模式@Bean("simple_queue")public Queue createCommonQueue() {/*name : 队列名称durable : 持久化exclusive : 是否自动连接autoDelete : 是否自动清除空闲队列arguments : 队列的其他配置*/return new Queue(SIMPLE_QUEUE, true, false, false, null);}}
消费者
package com.hikktn.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName SimpleListener* @Description TODO* @Author lisonglin* @Date 2021/4/9 21:20* @Version 1.0*/@Componentpublic class SimpleListener {@RabbitListener(queues = "simple_queue")public void receive(Message message, Channel channel) {String messageRec = new String(message.getBody());System.out.println("simple模式接收到了消息:"+messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------"+e.getMessage());}}}
生产者
package com.hikktn.producer;import com.hikktn.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @ClassName Sender* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:54* @Version 1.0*/@Componentpublic class Sender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 简单模式 (生产者)* @param msg*/public void sendSimpleTest(String msg) {rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE, msg);}}
控制器
package com.hikktn.controller;import com.hikktn.producer.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description TODO* @Author lisonglin* @Date 2021/4/9 19:27* @Version 1.0*/@RestControllerpublic class ConsumerController {@Autowiredprivate Sender sender;@GetMapping("/getSimple")public String sendSimple(){sender.sendSimpleTest("simple模式,发送了消息");return "ok";}}
工作模式
配置类
package com.hikktn.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMQConfig* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:49* @Version 1.0*/@Configurationpublic class RabbitMQConfig {// 工作模式public static final String WORK_QUEUE_NAME = "work_queue";// 工作模式@Bean("work_queue")public Queue createWorkQueue() {return new Queue(WORK_QUEUE_NAME, true, false, false, null);}}
生产者
package com.hikktn.producer;import com.hikktn.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @ClassName Sender* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:54* @Version 1.0*/@Componentpublic class Sender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 生产者 (工作模式)* @param msg*/public void sendWorkTest(String msg) {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_QUEUE_NAME, msg + i);}}}
消费者1
package com.hikktn.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName WorkListener* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:13* @Version 1.0*/@Componentpublic class WorkListener1 {@RabbitListener(queues = "work_queue")public void receive(Message message, Channel channel,String msg) throws IOException {// System.out.println("收到了"+msg);channel.basicQos(1);String messageRec = new String(message.getBody());System.out.println("work模式1接收到了消息:"+messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------"+e.getMessage());}}}
消费者2
package com.hikktn.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName WorkListener* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:13* @Version 1.0*/@Componentpublic class WorkListener2 {@RabbitListener(queues = "work_queue")public void receive(Message message, Channel channel) throws IOException {channel.basicQos(1);String messageRec = new String(message.getBody());System.out.println("work模式2接收到了消息:"+messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------"+e.getMessage());}}}
控制器
package com.hikktn.controller;import com.hikktn.producer.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description TODO* @Author lisonglin* @Date 2021/4/9 19:27* @Version 1.0*/@RestControllerpublic class ConsumerController {@Autowiredprivate Sender sender;@GetMapping("/getWork")public String sendWork(){sender.sendWorkTest("work模式,发送了消息");return "ok";}}
订阅模式
配置类
package com.hikktn.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMQConfig* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:49* @Version 1.0*/@Configurationpublic class RabbitMQConfig {// 订阅模式public static final String FANOUT_QUEUE_NAME = "publish_fanout_queue_1";public static final String FANOUT_QUEUE_NAME1 = "publish_fanout_queue_2";public static final String TEST_FANOUT_EXCHANGE = "fount_exchange";// 订阅模式@Bean("publish_fanout_queue_1")public Queue createFountQueue1() {return new Queue(FANOUT_QUEUE_NAME, true, false, false, null);}@Bean("publish_fanout_queue_2")public Queue createFountQueue2() {return new Queue(FANOUT_QUEUE_NAME1, true, false, false, null);}// 声明交换机@Bean("fount_exchange")public FanoutExchange createFanoutExchange() {return ExchangeBuilder.fanoutExchange(TEST_FANOUT_EXCHANGE).durable(true).build();}//队列与交换机进行绑定@Beanpublic Binding bindingQueueAndFanoutExchange1(@Qualifier("publish_fanout_queue_1") Queue queue, @Qualifier("fount_exchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}//队列与交换机进行绑定@Beanpublic Binding bindingQueueAndFanoutExchange2(@Qualifier("publish_fanout_queue_2") Queue queue, @Qualifier("fount_exchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}}
生产者
package com.hikktn.producer;import com.hikktn.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @ClassName Sender* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:54* @Version 1.0*/@Componentpublic class Sender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 生产者1 (订阅模式)* @param msg*/public void sendFanoutTest1(String msg) {for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_FANOUT_EXCHANGE,"", msg);}}/*** 生产者2 (订阅模式)* @param msg*/public void sendFanoutTest2(String msg) {for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_FANOUT_EXCHANGE,"", msg);}}}
消费者1
package com.hikktn.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** @ClassName FanoutListener1* @Description TODO* @Author lisonglin* @Date 2021/4/9 21:23* @Version 1.0*/@Componentpublic class FanoutListener1 {@RabbitListener(queues = "publish_fanout_queue_1")public void receive(String message){System.out.println("消费者1接收到的消息为:" + message);}}
消费者2
package com.hikktn.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** @ClassName FanoutListener2* @Description TODO* @Author lisonglin* @Date 2021/4/9 21:28* @Version 1.0*/@Componentpublic class FanoutListener2 {@RabbitListener(queues = "publish_fanout_queue_2")public void receive(String message){System.out.println("消费者1接收到的消息为:" + message);}}
控制器
package com.hikktn.controller;import com.hikktn.producer.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description TODO* @Author lisonglin* @Date 2021/4/9 19:27* @Version 1.0*/@RestControllerpublic class ConsumerController {@Autowiredprivate Sender sender;@GetMapping("/getFount")public String sendFount(){sender.sendFanoutTest1("fount模式1,发送了消息");sender.sendFanoutTest2("fount模式2,发送了消息");return "ok";}}
路由模式
配置类
package com.hikktn.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMQConfig* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:49* @Version 1.0*/@Configurationpublic class RabbitMQConfig {// 路由模式public static final String DIRECT_QUEUE_NAME_1 = "routing_direct_queue_1";public static final String DIRECT_QUEUE_NAME_2 = "routing_direct_queue_2";public static final String TEST_DIRECT_EXCHANGE = "direct_exchange";private static final String DIRECT_ROUTING_KEY_INSERT = "insert";private static final String DIRECT_ROUTING_KEY_UPDATE = "update";// 路由模式@Bean("routing_direct_queue_1")public Queue createRoutingQueue1() {return new Queue(DIRECT_QUEUE_NAME_1, true, false, false, null);}@Bean("routing_direct_queue_2")public Queue createRoutingQueue2() {return new Queue(DIRECT_QUEUE_NAME_2, true, false, false, null);}// 声明交换机@Bean("direct_exchange")public DirectExchange createDirectExchange() {return ExchangeBuilder.directExchange(TEST_DIRECT_EXCHANGE).durable(true).build();}//队列与交换机进行绑定@Beanpublic Binding bindingQueueAndDirectExchange1(@Qualifier("routing_direct_queue_1") Queue queue, @Qualifier("direct_exchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY_INSERT);}@Beanpublic Binding bindingQueueAndDirectExchange2(@Qualifier("routing_direct_queue_2") Queue queue, @Qualifier("direct_exchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY_UPDATE);}}
生产者
package com.hikktn.producer;import com.hikktn.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @ClassName Sender* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:54* @Version 1.0*/@Componentpublic class Sender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 生产者1 (路由模式)* @param routingKey* @param msg*/public void sendDirectTest1(String routingKey,String msg) {rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_DIRECT_EXCHANGE,routingKey, msg);}/*** 生产者2 (路由模式)* @param routingKey* @param msg*/public void sendDirectTest2(String routingKey,String msg) {rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_DIRECT_EXCHANGE,routingKey, msg);}}
消费者1
package com.hikktn.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** @ClassName DirectListener1* @Description TODO* @Author lisonglin* @Date 2021/4/9 21:55* @Version 1.0*/@Componentpublic class DirectListener1 {@RabbitListener(queues = "routing_direct_queue_1")public void receive(String message){System.out.println("消费者1接收到的消息为:" + message);}}
消费者2
package com.hikktn.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** @ClassName DirectListener1* @Description TODO* @Author lisonglin* @Date 2021/4/9 21:55* @Version 1.0*/@Componentpublic class DirectListener2 {@RabbitListener(queues = "routing_direct_queue_2")public void receive(String message){System.out.println("消费者1接收到的消息为:" + message);}}
控制器
package com.hikktn.controller;import com.hikktn.producer.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description TODO* @Author lisonglin* @Date 2021/4/9 19:27* @Version 1.0*/@RestControllerpublic class ConsumerController {@Autowiredprivate Sender sender;@GetMapping("/getDirect")public String sendDirect(){sender.sendDirectTest1("insert","direct模式1,发送了消息");sender.sendDirectTest2("update","direct模式2,发送了消息");return "ok";}}
通配符模式
配置类
package com.hikktn.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @ClassName RabbitMQConfig* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:49* @Version 1.0*/@Configurationpublic class RabbitMQConfig {// 通配符模式public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";private static final String TOPIC_QUEUE_NAME_1 = "topic_queue_1";private static final String TOPIC_QUEUE_NAME_2 = "topic_queue_2";private static final String TOPIC_ROUTING_KEY_EMAIL = "*@qq.com";private static final String TOPIC_ROUTING_KEY_SMS = "sms.#";// 通配符模式@Bean("topic_queue_1")public Queue createTopicQueue1() {return QueueBuilder.durable(TOPIC_QUEUE_NAME_1).build();}@Bean("topic_queue_2")public Queue createTopicQueue2() {return QueueBuilder.durable(TOPIC_QUEUE_NAME_2).build();}// 声明交换机@Bean("topic_exchange")public Exchange createTopicExchange() {return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();}// 绑定队列和交换机@Beanpublic Binding bindingQueueAndTopicExchange1(@Qualifier("topic_queue_1") Queue queue, @Qualifier("topic_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING_KEY_EMAIL).noargs();}@Beanpublic Binding bindingQueueAndTopicExchange2(@Qualifier("topic_queue_2") Queue queue, @Qualifier("topic_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING_KEY_SMS).noargs();}}
生产者
package com.hikktn.producer;import com.hikktn.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @ClassName Sender* @Description TODO* @Author lisonglin* @Date 2021/4/9 16:54* @Version 1.0*/@Componentpublic class Sender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 生产者1 (通配符模式)* @param routingKey* @param msg*/public void sendTopicTest1(String routingKey,String msg) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,routingKey,msg);}/*** 生产者2 (通配符模式)* @param routingKey* @param msg*/public void sendTopicTest2(String routingKey,String msg) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,routingKey,msg);}}
消费者
package com.hikktn.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/*** @ClassName MyListener* @Description TODO* @Author lisonglin* @Date 2021/4/9 14:51* @Version 1.0*/@Componentpublic class TopicListener {/*** 监听某个队列的消息** @param message 接收到的消息*/@RabbitListener(queues = "topic_queue_1")public void receive1(Message message, Channel channel, String msg) {System.out.println("消费者1接收到的消息为:" + msg);String messageRec = new String(message.getBody());System.out.println("消费者1接收到的消息为:" + messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------"+e.getMessage());}}@RabbitListener(queues = "topic_queue_2")public void receive2(Message message, Channel channel) {String messageRec = new String(message.getBody());System.out.println("消费者1接收到的消息为:" + messageRec);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("报错了------------------" + e.getMessage());}}}
控制器
package com.hikktn.controller;import com.hikktn.producer.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description TODO* @Author lisonglin* @Date 2021/4/9 19:27* @Version 1.0*/@RestControllerpublic class ConsumerController {@Autowiredprivate Sender sender;@GetMapping("/getTopic")public String sendTopic(){sender.sendTopicTest1("1213457107@qq.com","topic模式1,发送给1213457107@qq.com消息");sender.sendTopicTest2("sms.@qq.com","topic模式2,发送给sms.@qq.com消息");return "ok";}}
测试





