如何整合RabbitMQ
1、添加spring-boot-starter-amqp
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、添加配置
spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.publisher-confirms=truespring.rabbitmq.dynamic=truespring.rabbitmq.cache.connection.mode=channel
3、注入队列
@Configurationpublic class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}}
4、创建操作数据的Repository对象
interface CityRepository extends Repository<City, Long> {Page<City> findAll(Pageable pageable);Page<City> findByNameContainingAndCountryContainingAllIgnoringCase(String name,String country, Pageable pageable);City findByNameAndCountryAllIgnoringCase(String name, String country);}
5、创建消费者
@Componentpublic class RabbitConsumer {@RabbitHandler@RabbitListener(queues = "hello")public void process(@Payload String foo) {System.out.println(new Date() + ": " + foo);}}
6、启动主类
@SpringBootApplication@EnableSchedulingpublic class AmqpApplication {public static void main(String[] args) {SpringApplication.run(AmqpApplication.class, args);}}
控制台输出:
Sun Sep 30 16:30:35 CST 2018: hello
到此,一个简单的SpringBoot2.0集成RabbitMQ就完成了。
熟悉RabbitMQ的小伙伴们应该知道,RabbitMQ在一般的队列基础上,增加了ExChange的概念。ExChange有四种类型:Direct, Topic, Headers and Fanout。其中Headers实际很少使用,Direct较为简单。接下来将详细介绍如何使用topic和Fanout。
Topic Exchange
1、配置Topic规则
@Configurationpublic class TopicRabbitConfig {@Beanpublic Queue queueMessage1() {return new Queue(MQConst.TOPIC_QUEUENAME1);}@Beanpublic Queue queueMessage2() {return new Queue(MQConst.TOPIC_QUEUENAME2);}@BeanTopicExchange exchange() {return new TopicExchange(MQConst.TOPIC_EXCHANGE);}@BeanBinding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) {// 将队列1绑定到名为topicKey.A的routingKeyreturn BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1);}@BeanBinding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) {// 将队列2绑定到所有topicKey.开头的routingKeyreturn BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS);}}
2、配置消费者
@Componentpublic class TopicConsumer {@RabbitListener(queues = MQConst.TOPIC_QUEUENAME1)@RabbitHandlerpublic void process1(String message) {System.out.println("queue:topic.message1,message:" + message);}@RabbitListener(queues = MQConst.TOPIC_QUEUENAME2)@RabbitHandlerpublic void process2(String message) {System.out.println("queue:topic.message2,message:" + message);}}
3、生产消息
在Producer类中添加:
// TopicrabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "from keys");rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");
再次启动主类,控制台输出:
queue:topic.message2,message:from keysqueue:topic.message1,message:from key1queue:topic.message2,message:from key1
Fanout Exchange
1、配置Fanout规则
@Configurationpublic class FanoutRabbitConfig {@Beanpublic Queue MessageA() {return new Queue(MQConst.FANOUT_QUEUENAME1);}@Beanpublic Queue MessageB() {return new Queue(MQConst.FANOUT_QUEUENAME2);}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(MQConst.FANOUT_EXCHANGE);}@BeanBinding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {return BindingBuilder.bind(MessageA).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(MessageB).to(fanoutExchange);}}
2.配置消费者
@Componentpublic class FanoutConsumer {@RabbitListener(queues = MQConst.FANOUT_QUEUENAME1)@RabbitHandlerpublic void process1(String message) {System.out.println("queue:fanout.message1,message:" + message);}@RabbitListener(queues = MQConst.FANOUT_QUEUENAME2)@RabbitHandlerpublic void process2(String message) {System.out.println("queue:fanout.message2,message:" + message);}}
3、生产消息
在Producer类中添加:
// FanOutrabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE, "", "fanout");
再次启动主类,控制台输出:
queue:fanout.message2,message:fanoutqueue:fanout.message1,message:fanout
源码地址:GitHub
