消息队列rabbitMq的安装和使用。

安装

centos端安装

配置安装源(参考:https://github.com/rabbitmq/erlang-rpm

  1. sudo yum install -y erlang socat
  2. sudo vim /etc/yum.repos.d/rabbitmq-erlang.repo
  3. [rabbitmq-erlang]
  4. name=rabbitmq-erlang
  5. baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/19/el/7
  6. gpgcheck=1
  7. gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
  8. repo_gpgcheck=0
  9. enabled=1

安装rabbitMq

  1. sudo rpm -Uvh https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.3/rabbitmq-server-3.7.3-1.el7.noarch.rpm --nodeps
  2. # 启动
  3. sudo service rabbitmq-server restart

安装管理插件和新建管理用户

  1. sudo rabbitmq-plugins enable rabbitmq_management
  2. sudo rabbitmqctl add_user admin 123456
  3. sudo rabbitmqctl set_user_tags admin administrator
  4. sudo rabbitmqctl set_permissions -p "/" admin "." "." ".*"

服务

  1. ### 服务操作
  2. sudo service rabbitmq-server restart # 重启服务
  3. sudo systemctl enable rabbitmq-server # 开机启动
  4. sudo systemctl disable rabbitmq-server # 不开机启动

web管理页面

访问:ip:15672,可以看到初始页面。

image.png

操作

springboot

1、引用

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2、配置

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. username: admin
  5. password: admin
  6. port: 5672

3、直接模式(direct)

3.1、定义队列

  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class RabbitConfig {
  6. @Bean
  7. public Queue helloQueue() {
  8. return new Queue("hello");
  9. }
  10. @Bean
  11. public Queue neoQueue() {
  12. return new Queue("neo");
  13. }
  14. @Bean
  15. public Queue objectQueue() {
  16. return new Queue("object");
  17. }
  18. }

3.2、发送

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. public class HelloSender {
  8. @Autowired
  9. private AmqpTemplate rabbitTemplate;
  10. public void send() {
  11. String context = "hello " + new Date();
  12. System.out.println("Sender : " + context);
  13. this.rabbitTemplate.convertAndSend("hello", context);
  14. }
  15. }

3.3、接收

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.cache.annotation.Cacheable;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. @RabbitListener(queues = "hello")
  8. public class HelloReceiver {
  9. @RabbitHandler
  10. public void process(String hello) {
  11. System.out.println("Receiver : " + hello);
  12. }
  13. }

4、分列模式(fanout)

订阅模式,比较简单。

4.1 定义队列

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class FanoutRabbitConfig {
  9. @Bean
  10. public Queue AMessage() {
  11. return new Queue("fanout.A");
  12. }
  13. @Bean
  14. public Queue BMessage() {
  15. return new Queue("fanout.B");
  16. }
  17. @Bean
  18. public Queue CMessage() {
  19. return new Queue("fanout.C");
  20. }
  21. @Bean
  22. FanoutExchange fanoutExchange() {
  23. return new FanoutExchange("fanoutExchange");
  24. }
  25. @Bean
  26. Binding bindingExchangeA() {
  27. return BindingBuilder.bind(AMessage()).to(fanoutExchange());
  28. }
  29. @Bean
  30. Binding bindingExchangeB() {
  31. return BindingBuilder.bind(BMessage()).to(fanoutExchange());
  32. }
  33. @Bean
  34. Binding bindingExchangeC() {
  35. return BindingBuilder.bind(CMessage()).to(fanoutExchange());
  36. }
  37. }

4.2 发送

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class FanoutSender {
  6. @Autowired
  7. private AmqpTemplate rabbitTemplate;
  8. public void send() {
  9. String context = "hi, fanout msg ";
  10. System.out.println("Sender : " + context);
  11. this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
  12. }
  13. }

4.3 接收

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @RabbitListener(queues = "fanout.A")
  6. public class FanoutReceiverA {
  7. @RabbitHandler
  8. public void process(String message) {
  9. System.out.println("fanout Receiver A: " + message);
  10. }
  11. }
  12. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  13. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  14. import org.springframework.stereotype.Component;
  15. @Component
  16. @RabbitListener(queues = "fanout.B")
  17. public class FanoutReceiverB {
  18. @RabbitHandler
  19. public void process(String message) {
  20. System.out.println("fanout Receiver B: " + message);
  21. }
  22. }

5、主题模式(topic)

最自由的一个模式。

5.1 定义队列

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class TopicRabbitConfig {
  9. final static String message = "topic.message";
  10. final static String messages = "topic.messages";
  11. @Bean
  12. public Queue queueMessage() {
  13. return new Queue(TopicRabbitConfig.message);
  14. }
  15. @Bean
  16. public Queue queueMessages() {
  17. return new Queue(TopicRabbitConfig.messages);
  18. }
  19. @Bean
  20. TopicExchange exchange() {
  21. return new TopicExchange("topicExchange");
  22. }
  23. @Bean
  24. Binding bindingExchangeMessage() {
  25. return BindingBuilder.bind(queueMessage()).to(exchange()).with("topic.message");
  26. }
  27. @Bean
  28. Binding bindingExchangeMessages() {
  29. return BindingBuilder.bind(queueMessages()).to(exchange()).with("topic.#");
  30. }
  31. }

5.2 发送

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import java.util.Date;
  5. @Component
  6. public class TopicSender {
  7. @Autowired
  8. private AmqpTemplate rabbitTemplate;
  9. public void send() {
  10. String context = "hi, i am message all";
  11. System.out.println("Sender : " + context);
  12. this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
  13. }
  14. public void send1() {
  15. String context = "hi, i am message 1";
  16. System.out.println("Sender : " + context);
  17. this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
  18. }
  19. public void send2() {
  20. String context = "hi, i am messages 2";
  21. System.out.println("Sender : " + context);
  22. this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
  23. }
  24. }

5.3 接收

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @RabbitListener(queues = "topic.message")
  6. public class TopicReceiver {
  7. @RabbitHandler
  8. public void process(String message) {
  9. System.out.println("Topic Receiver1 : " + message);
  10. }
  11. }
  12. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  13. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  14. import org.springframework.stereotype.Component;
  15. @Component
  16. @RabbitListener(queues = "topic.messages")
  17. public class TopicReceiver2 {
  18. @RabbitHandler
  19. public void process(String message) {
  20. System.out.println("Topic Receiver2 : " + message);
  21. }
  22. }