RabbitMQ的优势

解耦
异步提速
削峰填谷

RabbitMQ的劣势

系统可用性降低
系统复杂度提高
一致性

RabbitMQ的安装(Linux Centos7)

  1. 我们下载最新版的rpm包

image.png

  1. rpm -ivh rabbitmq-server-3.6.8-1.el7.noarch.rpm

然后我们先安装RabbitMQ的依赖erlang,之后在安装RabbitMQ
如果在安装的时候提示缺少socat,使用下面的命令安装

yum install socat logrotate -y

  1. 常用的命令 ```bash RabbitMQ默认的用户名和密码是guest,但是只能从本地localhost登录,所以我们需要添加新的用户

rabbitmqctl add_user user_name user_passwd #设置用户名和密码

rabbitmqctl set_user_tags user_name administrator #设置用户标签

rabbitmqctl set_permissions -p ‘/‘ user_name ‘.’ ‘.’ ‘.’ #设置权限

systemctl status rabbitmq-server #查看RabbitMQ服务状态

rabbitmqctl list_users #列出所有用户

开启管理页面插件,才可以使用web管理端

rabbitmq-plugins enable rabbitmq_management

开启rabbitMQ连接端口

firewall-cmd —zone=public —add-port=15672/tcp —permanent

程序或者其他机器交互使用时:

firewall-cmd —zone=public —add-port=5672/tcp —permanent

firewall-cmd —reload

  1. ```java
  2. #启动流程
  3. rabbitmq-plugins enable rabbitmq_management
  4. systemctl start rabbitmq-server
  5. systemctl status firewalld #查看防火墙状态
  6. firewall-cmd --list-ports --permanent #查看永久开放端口
  7. firewall-cmd --add-port=15672/tcp --permanent #添加永久开放端口
  8. systemctl restart firewalld #重启防火墙
  9. rabbitmqctl add_user admin
  10. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  11. rabbitmqctl set_user_tags admin administrator

image.png

入门

生产者

  1. package com.rabbitmq.producer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer_Hello {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建链接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. //设置参数
  12. factory.setHost("192.168.47.129");
  13. factory.setPort(5672);
  14. factory.setVirtualHost("/sky");
  15. factory.setUsername("admin");
  16. factory.setPassword("admin");
  17. //创建链接
  18. Connection connection = factory.newConnection();
  19. //创建Channel
  20. Channel channel = connection.createChannel();
  21. //创建队列Queue
  22. channel.queueDeclare("hello",true,false,false,null);
  23. //发送消息
  24. String msg = "helloworld";
  25. channel.basicPublish("","hello",null,msg.getBytes());
  26. //释放资源
  27. channel.close();
  28. connection.close();
  29. }
  30. }

消费者

  1. package com.rabbitmq.consumer;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer_Hello {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //创建链接工厂
  8. ConnectionFactory factory = new ConnectionFactory();
  9. //设置参数
  10. factory.setHost("192.168.47.129");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/sky");
  13. factory.setUsername("admin");
  14. factory.setPassword("admin");
  15. //创建链接
  16. Connection connection = factory.newConnection();
  17. //创建Channel
  18. Channel channel = connection.createChannel();
  19. //创建队列Queue
  20. channel.queueDeclare("hello",true,false,false,null);
  21. DefaultConsumer consumer = new DefaultConsumer(channel){
  22. @Override
  23. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. System.out.println(consumerTag);
  25. System.out.println(new String(body));
  26. }
  27. };
  28. channel.basicConsume("hello",true,consumer);
  29. }
  30. }

工作模式WorkQueues

  1. package com.rabbitmq.producer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer_work {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.47.129");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/sky");
  13. factory.setUsername("admin");
  14. factory.setPassword("admin");
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. channel.queueDeclare("work",true,false,false,null);
  18. for (int i = 0; i < 10; i++) {
  19. String msg = i+"helloworld";
  20. channel.basicPublish("","work",null,msg.getBytes());
  21. }
  22. channel.close();
  23. connection.close();
  24. }
  25. }
  26. package com.rabbitmq.consumer;
  27. import com.rabbitmq.client.*;
  28. import java.io.IOException;
  29. import java.util.concurrent.TimeoutException;
  30. public class Consumer_work01 {
  31. public static void main(String[] args) throws IOException, TimeoutException {
  32. ConnectionFactory factory = new ConnectionFactory();
  33. factory.setHost("192.168.47.129");
  34. factory.setPort(5672);
  35. factory.setVirtualHost("/sky");
  36. factory.setUsername("admin");
  37. factory.setPassword("admin");
  38. Connection connection = factory.newConnection();
  39. Channel channel = connection.createChannel();
  40. channel.queueDeclare("work",true,false,false,null);
  41. DefaultConsumer consumer = new DefaultConsumer(channel){
  42. @Override
  43. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  44. System.out.println(consumerTag);
  45. System.out.println(new String(body));
  46. }
  47. };
  48. channel.basicConsume("work",true,consumer);
  49. }
  50. }

image.png
image.png

订阅模式pub/sub

channel.exchangeDeclare(“pubsub”, BuiltinExchangeType.FANOUT,true,false,false,null); type是fanout

  1. package com.rabbitmq.producer;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer_pubsub {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("192.168.47.129");
  12. factory.setPort(5672);
  13. factory.setVirtualHost("/sky");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. //创建交换机
  19. channel.exchangeDeclare("pubsub", BuiltinExchangeType.FANOUT,true,false,false,null);
  20. /**
  21. * DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean var3, boolean var4, boolean var5, Map<String, Object> var6)
  22. *
  23. * exchange 交换机的名称
  24. * type 交换机类型
  25. * DIRECT("direct"), 定向
  26. * FANOUT("fanout"), 扇形
  27. * TOPIC("topic"), 通配符方式
  28. * HEADERS("headers"); 参数匹配
  29. * 是否持久化
  30. * 是否自动删除
  31. * 内部使用 一般false
  32. * 参数列表
  33. */
  34. //创建队列
  35. channel.queueDeclare("q1",true,false,false,null);
  36. channel.queueDeclare("q2",true,false,false,null);
  37. //绑定交换机和队列
  38. channel.queueBind("q1","pubsub","");
  39. channel.queueBind("q2","pubsub","");
  40. channel.basicPublish("pubsub","",null,"1231".getBytes());
  41. channel.close();
  42. connection.close();
  43. }
  44. }
  1. package com.rabbitmq.consumer;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer_pubsub01 {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("192.168.47.129");
  9. factory.setPort(5672);
  10. factory.setVirtualHost("/sky");
  11. factory.setUsername("admin");
  12. factory.setPassword("admin");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. channel.queueDeclare("q1",true,false,false,null);
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println(consumerTag);
  20. System.out.println(new String(body));
  21. }
  22. };
  23. channel.basicConsume("q1",true,consumer);
  24. }
  25. }

路由工作模式

channel.exchangeDeclare(“rote”, BuiltinExchangeType.DIRECT,true,false,false,null); type 是direct

  1. package com.rabbitmq.producer;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer_roting {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("192.168.47.129");
  12. factory.setPort(5672);
  13. factory.setVirtualHost("/sky");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. //创建交换机
  19. channel.exchangeDeclare("rote", BuiltinExchangeType.DIRECT,true,false,false,null);
  20. /**
  21. * DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean var3, boolean var4, boolean var5, Map<String, Object> var6)
  22. *
  23. * exchange 交换机的名称
  24. * type 交换机类型
  25. * DIRECT("direct"), 定向
  26. * FANOUT("fanout"), 扇形
  27. * TOPIC("topic"), 通配符方式
  28. * HEADERS("headers"); 参数匹配
  29. * 是否持久化
  30. * 是否自动删除
  31. * 内部使用 一般false
  32. * 参数列表
  33. */
  34. //创建队列
  35. channel.queueDeclare("q1",true,false,false,null);
  36. channel.queueDeclare("q2",true,false,false,null);
  37. //绑定交换机和队列
  38. channel.queueBind("q1","rote","error");
  39. channel.queueBind("q2","rote","");
  40. channel.basicPublish("rote","error",null,"q1接收".getBytes());
  41. channel.basicPublish("rote","",null,"q2接收".getBytes());
  42. channel.close();
  43. connection.close();
  44. }
  45. }

通配符工作模式

type 是topic

  1. package com.rabbitmq.producer;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer_topic {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("192.168.47.129");
  12. factory.setPort(5672);
  13. factory.setVirtualHost("/sky");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. //创建交换机
  19. channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC,true,false,false,null);
  20. channel.queueDeclare("q1",true,false,false,null);
  21. channel.queueDeclare("q2",true,false,false,null);
  22. //绑定交换机和队列
  23. /**
  24. * * 表示一个单词
  25. * # 表示多个单词
  26. */
  27. channel.queueBind("q1","topic","*.error");
  28. channel.queueBind("q1","topic","order.*");
  29. channel.queueBind("q2","topic","#.#");
  30. channel.basicPublish("topic","order.aa",null,"q1接收".getBytes());
  31. channel.basicPublish("topic","",null,"q2接收".getBytes());
  32. channel.close();
  33. connection.close();
  34. }
  35. }

Spring整合RabbitMQ

生产者配置以及代码

  1. #rabbitmq的配置
  2. rabbitmq.host=192.168.47.129
  3. rabbitmq.port=5672
  4. rabbitmq.username=admin
  5. rabbitmq.password=admin
  6. rabbitmq.virtual-host=/sky #虚拟机名称
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. http://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/rabbit
  12. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  13. ">
  14. <!-- 加载配置文件-->
  15. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  16. <context:annotation-config/>
  17. <!-- 定义连接工厂-->
  18. <rabbit:connection-factory id="connectionFactory"
  19. host="${rabbitmq.host}"
  20. port="${rabbitmq.port}"
  21. username="${rabbitmq.username}"
  22. password="${rabbitmq.password}"
  23. virtual-host="${rabbitmq.virtual-host}"
  24. />
  25. <!-- id是bean的名称 name是队列的名称 auto-declare是自动创建 auto-delete是最后一个客户端断开连接后自动删除队列-->
  26. <!-- 定义交换机-->
  27. <rabbit:admin connection-factory="connectionFactory"/>
  28. <rabbit:queue id="spring_hello" name="spring_hello" auto-declare="true"/>
  29. <rabbit:queue id="spring_work1" name="spring_work1" auto-declare="true"/>
  30. <rabbit:queue id="spring_work2" name="spring_work2" auto-declare="true"/>
  31. <rabbit:queue id="spring_fanout1" name="spring_fanout1" auto-declare="true"/>
  32. <rabbit:queue id="spring_fanout2" name="spring_fanout2" auto-declare="true"/>
  33. <rabbit:fanout-exchange name="spring_exchangefanout" id="spring_exchangefanout" auto-declare="true">
  34. <rabbit:bindings>
  35. <rabbit:binding queue="spring_fanout1"/>
  36. <rabbit:binding queue="spring_fanout2"/>
  37. </rabbit:bindings>
  38. </rabbit:fanout-exchange>
  39. <!-- key是指路由key-->
  40. <rabbit:queue id="spring_route1" name="spring_route1" auto-declare="true"/>
  41. <rabbit:queue id="spring_route2" name="spring_route2" auto-declare="true"/>
  42. <rabbit:direct-exchange name="spring_exchangedirect" id="spring_exchangedirect" auto-declare="true">
  43. <rabbit:bindings>
  44. <rabbit:binding key="error" queue="spring_route1"/>
  45. <rabbit:binding queue="spring_route2"/>
  46. </rabbit:bindings>
  47. </rabbit:direct-exchange>
  48. <rabbit:queue id="spring_topic1" name="spring_topic1" auto-declare="true"/>
  49. <rabbit:queue id="spring_topic2" name="spring_topic2" auto-declare="true"/>
  50. <!-- pattern是指匹配规则-->
  51. <rabbit:topic-exchange name="spring_exchangetopic" id="spring_exchangetopic" auto-declare="true">
  52. <rabbit:bindings>
  53. <rabbit:binding pattern="order.*" queue="spring_topic1"/>
  54. <rabbit:binding pattern="order.#" queue="spring_topic2"/>
  55. </rabbit:bindings>
  56. </rabbit:topic-exchange>
  57. <!-- rabbitTemplate是用来发送数据的-->
  58. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  59. </beans>
  1. import org.junit.Test;
  2. import org.junit.runner.RunWith;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.test.context.ContextConfiguration;
  6. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  7. @RunWith(SpringJUnit4ClassRunner.class)
  8. @ContextConfiguration(locations = "classpath:application.xml")
  9. public class Spring_Producer {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. //发送简单工作模式
  13. @Test
  14. public void test01(){
  15. rabbitTemplate.convertAndSend("spring_hello","helloworld");
  16. }
  17. //发送广播
  18. @Test
  19. public void fanout(){
  20. rabbitTemplate.convertAndSend("spring_exchangefanout","","广播发送!!");
  21. }
  22. @Test
  23. public void direct(){
  24. rabbitTemplate.convertAndSend("spring_exchangedirect","error","路由发送!!");
  25. }
  26. @Test
  27. public void topic(){
  28. rabbitTemplate.convertAndSend("spring_exchangetopic","order.a.a","topic发送!!");
  29. }
  30. }

消费者配置以及代码

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. http://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/rabbit
  12. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  13. ">
  14. <!-- 加载配置文件-->
  15. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  16. <context:annotation-config/>
  17. <!-- 定义连接工厂-->
  18. <rabbit:connection-factory id="connectionFactory"
  19. host="${rabbitmq.host}"
  20. port="${rabbitmq.port}"
  21. username="${rabbitmq.username}"
  22. password="${rabbitmq.password}"
  23. virtual-host="${rabbitmq.virtual-host}"
  24. />
  25. <!-- 定义监听器类 -->
  26. <bean id="SpringQueueHello" class="com.springrabbit.HelloListener"/>
  27. <!-- 配置监听器容器 ,每一个监听器对应监听的队列 -->
  28. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
  29. <rabbit:listener ref="SpringQueueHello" queue-names="spring_hello"/>
  30. </rabbit:listener-container>
  31. </beans>
  1. #监听类需要实现MessageListener接口并且重写onMessage方法
  2. package com.springrabbit;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageListener;
  5. public class HelloListener implements MessageListener {
  6. @Override
  7. public void onMessage(Message message) {
  8. System.out.println(new String(message.getBody()));
  9. }
  10. }

Springboot整合RabbitMQ

  1. #首先在配置文件中配置RabbitMQ
  2. spring:
  3. rabbitmq:
  4. host: 192.168.47.129
  5. port: 5672
  6. username: admin
  7. password: admin
  8. virtual-host: /sky

生产者

springboot的生产者是一个配置类,在配置类中我们配置各种类型的交换机以及队列,然后定义绑定关系

  1. package com.sbmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. private static final String EXCHANGENAME="boot_exchange";
  9. @Bean("bootExchange")
  10. public Exchange bootExchange(){
  11. return ExchangeBuilder.directExchange(EXCHANGENAME).durable(true).build();
  12. }
  13. @Bean("bootQueue")
  14. public Queue bootQueue(){
  15. return QueueBuilder.durable("bootqueue").build();
  16. }
  17. @Bean
  18. public Binding bind(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
  19. return BindingBuilder.bind(queue).to(exchange).with("boot").noargs();
  20. }
  21. }

消费者

  1. package com.sbmq.config;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Listener {
  7. @RabbitListener(queues = "bootqueue")
  8. public void listener(Message message){
  9. System.out.println(message);
  10. }
  11. }
  12. 编写一个监听类,使用@RabbitListener注解注明监听的队列,然后方法中接收一个Message类型的参数,里面就是消息

RabbitMQ高级

消息可靠投递

confirm

  1. 1.首先我们需要开启付,默认是关闭的
  2. <rabbit:connection-factory id="connectionFactory"
  3. host="${rabbitmq.host}"
  4. port="${rabbitmq.port}"
  5. username="${rabbitmq.username}"
  6. password="${rabbitmq.password}"
  7. virtual-host="${rabbitmq.virtual-host}"
  8. publisher-confirms="true"
  9. />
  10. 2.设置回调函数
  11. @Test
  12. public void test01(){
  13. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  14. /**
  15. *
  16. * @param correlationData 相关配置信息
  17. * @param b 是否成功
  18. * @param s 失败的原因
  19. */
  20. @Override
  21. public void confirm(CorrelationData correlationData, boolean b, String s) {
  22. System.out.println("huidioa");
  23. }
  24. });
  25. rabbitTemplate.convertAndSend("spring_hello","helloworld");
  26. }

return

  1. @Test
  2. public void test01(){
  3. /**
  4. * 1:开启服务
  5. * 2:设置回调函数
  6. * 3:设置模式
  7. * 默认失败丢弃消息
  8. * 失败返回消息
  9. */
  10. rabbitTemplate.setMandatory(true);
  11. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  12. /**
  13. *
  14. * @param message 消息数据
  15. * @param i 错误码
  16. * @param s 错误信息
  17. * @param s1 交换机
  18. * @param s2 路由key
  19. */
  20. @Override
  21. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  22. System.out.println("return");
  23. }
  24. });
  25. rabbitTemplate.convertAndSend("spring_hello12","helloworld");
  26. }

ACK 消息可靠性接收

  1. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual">
  2. <rabbit:listener ref="SpringQueueHello" queue-names="spring_hello"/>
  3. </rabbit:listener-container>
  4. acknowledge="manual"
  5. manual 手动
  6. auto 根据异常
  7. none 自动接收
  1. package com.springrabbit;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageListener;
  5. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  6. import java.io.IOException;
  7. public class HelloListener implements ChannelAwareMessageListener {
  8. @Override
  9. public void onMessage(Message message, Channel channel) throws Exception {
  10. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  11. try {
  12. Thread.sleep(1000);
  13. System.out.println(message);
  14. /**
  15. * 当前标签
  16. * true签收所有消息
  17. */
  18. channel.basicAck(deliveryTag,true);
  19. } catch (Exception e) {
  20. channel.basicNack(deliveryTag,true,true);//第三个参数true是返回队列重新发送
  21. }
  22. }
  23. }

消费端限流(削峰填谷)

  1. 只需要设置监听容器新增加一个属性
  2. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"
  3. acknowledge="manual" prefetch="1">
  4. <rabbit:listener ref="SpringQueueHello" queue-names="spring_hello"/>
  5. </rabbit:listener-container>
  6. ## 1:需要设置手动签收消息
  7. ## 2:prefetch 里面的数表示一次接收几个消息,签收后才继续接收消息

TTL(Time To Live)存活/过期

  1. ##1:对队列设置过期时间
  2. 使用参数x-message-ttl,单位ms
  3. <rabbit:queue id="spring_topic1" name="spring_topic1" auto-declare="true">
  4. <rabbit:queue-arguments>
  5. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
  6. </rabbit:queue-arguments>
  7. </rabbit:queue>
  8. ##2:对消息单独设置过期时间
  9. @Test
  10. public void test01(){
  11. MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  12. @Override
  13. public Message postProcessMessage(Message message) throws AmqpException {
  14. message.getMessageProperties().setExpiration("50000");//过期时间
  15. return message;
  16. }
  17. };
  18. rabbitTemplate.convertAndSend("spring_hello","","helloworld",messagePostProcessor);
  19. }

死信队列

  1. # 生产者配置
  2. <!-- 死信队列
  3. 1:正常的队列和交换机
  4. 2:死信队列和交换机
  5. -->
  6. <!-- 正常队列-->
  7. <rabbit:queue id="dead_queue" name="dead_queue" auto-declare="true">
  8. <rabbit:queue-arguments>
  9. <!-- 绑定死信交换机-->
  10. <entry key="x-dead-letter-exchange" value="deaded_exchange"/>
  11. <!-- 设置发送消息的路由key-->
  12. <entry key="x-dead-letter-routing-key" value="deaded.test"/>
  13. <!-- 设置正常队列消息死亡时间以及长度-->
  14. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
  15. <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
  16. </rabbit:queue-arguments>
  17. </rabbit:queue>
  18. <rabbit:topic-exchange name="dead_exchange" id="dead_exchange" auto-declare="true">
  19. <rabbit:bindings>
  20. <rabbit:binding pattern="dead.*" queue="dead_queue"/>
  21. </rabbit:bindings>
  22. </rabbit:topic-exchange>
  23. <!-- 死信队列-->
  24. <rabbit:queue id="deaded_queue" name="deaded_queue" auto-declare="true"/>
  25. <rabbit:topic-exchange name="deaded_exchange">
  26. <rabbit:bindings>
  27. <rabbit:binding pattern="deaded.*" queue="deaded_queue"/>
  28. </rabbit:bindings>
  29. </rabbit:topic-exchange>
  1. # 消费者配置以及代码
  2. <bean id="DeadQueue" class="com.springrabbit.DeadListener"/>
  3. <bean id="DeadedQueue" class="com.springrabbit.DeadedListener"/>
  4. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
  5. <rabbit:listener ref="SpringQueueHello" queue-names="spring_hello"/>
  6. <rabbit:listener ref="DeadQueue" queue-names="dead_queue"/>
  7. <rabbit:listener ref="DeadedQueue" queue-names="deaded_queue"/>
  8. </rabbit:listener-container>
  9. package com.springrabbit;
  10. import com.rabbitmq.client.Channel;
  11. import org.springframework.amqp.core.Message;
  12. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  13. public class DeadedListener implements ChannelAwareMessageListener {
  14. @Override
  15. public void onMessage(Message message, Channel channel) throws Exception {
  16. //接收死信队列的消息
  17. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  18. System.out.println(new String(message.getBody())+"死信");
  19. channel.basicAck(deliveryTag,true);
  20. }
  21. }
  22. package com.springrabbit;
  23. import com.rabbitmq.client.Channel;
  24. import org.springframework.amqp.core.Message;
  25. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  26. public class DeadListener implements ChannelAwareMessageListener {
  27. @Override
  28. public void onMessage(Message message, Channel channel) throws Exception {
  29. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  30. try {
  31. System.out.println(new String(message.getBody()));
  32. int i = 1/0;
  33. channel.basicAck(deliveryTag,true);
  34. } catch (Exception e) {
  35. channel.basicNack(deliveryTag,true,false);
  36. }
  37. }
  38. }

延时队列

日志与监控

消息追踪

  1. # 开启 ,默认开启/虚拟机
  2. [root@localhost rabbitmq]# rabbitmqctl trace_on
  3. # 关闭
  4. [root@localhost rabbitmq]# rabbitmqctl trace_off

更强大的插件 rabbitmq_tracing

  1. # 开启
  2. rabbitmq-plugins enable rabbitmq_tracing
  3. # 查看所有插件
  4. rabbitmq-plugins list

应用问题

消息补偿

幂等性保障

集群搭建

镜像队列