消息队列rabbitMq的安装和使用。
安装
centos端安装
配置安装源(参考:https://github.com/rabbitmq/erlang-rpm)
sudo yum install -y erlang socatsudo vim /etc/yum.repos.d/rabbitmq-erlang.repo[rabbitmq-erlang]name=rabbitmq-erlangbaseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/19/el/7gpgcheck=1gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.ascrepo_gpgcheck=0enabled=1
安装rabbitMq
sudo rpm -Uvh https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.3/rabbitmq-server-3.7.3-1.el7.noarch.rpm --nodeps# 启动sudo service rabbitmq-server restart
安装管理插件和新建管理用户
sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmqctl add_user admin 123456sudo rabbitmqctl set_user_tags admin administratorsudo rabbitmqctl set_permissions -p "/" admin "." "." ".*"
服务
### 服务操作sudo service rabbitmq-server restart # 重启服务sudo systemctl enable rabbitmq-server # 开机启动sudo systemctl disable rabbitmq-server # 不开机启动
web管理页面
访问:ip:15672,可以看到初始页面。

操作
springboot
1、引用
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、配置
spring:rabbitmq:host: 127.0.0.1username: adminpassword: adminport: 5672
3、直接模式(direct)
3.1、定义队列
import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConfig {@Beanpublic Queue helloQueue() {return new Queue("hello");}@Beanpublic Queue neoQueue() {return new Queue("neo");}@Beanpublic Queue objectQueue() {return new Queue("object");}}
3.2、发送
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}}
3.3、接收
import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.cache.annotation.Cacheable;import org.springframework.stereotype.Component;import java.util.Date;@Component@RabbitListener(queues = "hello")public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver : " + hello);}}
4、分列模式(fanout)
订阅模式,比较简单。
4.1 定义队列
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutRabbitConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(AMessage()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(BMessage()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(CMessage()).to(fanoutExchange());}}
4.2 发送
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class FanoutSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);}}
4.3 接收
import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA {@RabbitHandlerpublic void process(String message) {System.out.println("fanout Receiver A: " + message);}}import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB {@RabbitHandlerpublic void process(String message) {System.out.println("fanout Receiver B: " + message);}}
5、主题模式(topic)
最自由的一个模式。
5.1 定义队列
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(queueMessage()).to(exchange()).with("topic.message");}@BeanBinding bindingExchangeMessages() {return BindingBuilder.bind(queueMessages()).to(exchange()).with("topic.#");}}
5.2 发送
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hi, i am message all";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);}public void send1() {String context = "hi, i am message 1";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);}public void send2() {String context = "hi, i am messages 2";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);}}
5.3 接收
import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "topic.message")public class TopicReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("Topic Receiver1 : " + message);}}import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "topic.messages")public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("Topic Receiver2 : " + message);}}
