1.Maven依赖

  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2.RabbitConfig

  1. package com.xlwy.fkudp.rabbitmq;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  6. import org.springframework.amqp.rabbit.connection.CorrelationData;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Qualifier;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. /**
  13. * ClassName: mq配置 仅支持基于 topic 模式
  14. * @author WHN
  15. * @date 2020/11/9 16:14
  16. */
  17. @Configuration
  18. public class RabbitConfig {
  19. private final Logger log = LoggerFactory.getLogger(this.getClass());
  20. /**
  21. * 注入rabbitmq连接池
  22. * @author WHN
  23. * @date 2020/11/9 16:14
  24. */
  25. @Autowired
  26. private CachingConnectionFactory connectionFactory;
  27. /**
  28. * 交换机信息
  29. */
  30. public static final String EXCHANGE_TOPIC_PERSON = "exchange_topic_person";
  31. /**
  32. * 队列1
  33. */
  34. public static final String QUEUE_ELECTRIC = "QUEUE_ELECTRIC";
  35. /**
  36. * 队列秘钥规则
  37. */
  38. public static final String ROUTINGKEY_ELECTRIC = "electric.#";
  39. /**
  40. * @Function 声明交换机
  41. */
  42. @Bean(EXCHANGE_TOPIC_PERSON)
  43. public Exchange exchangeTopicInform() {
  44. return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_PERSON).durable(true).build();
  45. }
  46. /**
  47. * 获取队列 1个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
  48. */
  49. @Bean(QUEUE_ELECTRIC)
  50. public Queue queueElectric() {
  51. return new Queue(QUEUE_ELECTRIC, true);
  52. }
  53. /**
  54. * @Function 交换机与队列绑定
  55. */
  56. @Bean
  57. public Binding queueElectricBinding(@Qualifier(QUEUE_ELECTRIC) Queue queue,
  58. @Qualifier(EXCHANGE_TOPIC_PERSON) Exchange exchange) {
  59. return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.ROUTINGKEY_ELECTRIC).noargs();
  60. }
  61. @Bean
  62. public RabbitTemplate rabbitTemplate() {
  63. connectionFactory.setPublisherConfirms(true);
  64. connectionFactory.setPublisherReturns(true);
  65. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  66. rabbitTemplate.setMandatory(true);
  67. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  68. @Override
  69. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  70. if (ack) {
  71. log.info("消息推送到交换器成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
  72. } else {
  73. log.info("消息推送到交换器失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
  74. }
  75. }
  76. });
  77. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  78. @Override
  79. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  80. log.info("消息推送到队列丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
  81. }
  82. });
  83. return rabbitTemplate;
  84. }
  85. }

3.MqProduct生产者

  1. package com.xlwy.fkudp.rabbitmq;
  2. import cn.hutool.json.JSONObject;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * 生产者
  8. * @author WHN
  9. * @date 2020/11/9 16:16
  10. */
  11. @Component
  12. public class MqProduct {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. * 发送队列1设备上发数据
  17. * @author WHN
  18. * @date 2020/11/9 16:42
  19. */
  20. public void sendElectric(String type, String electricType, JSONObject json) {
  21. JSONObject jsonObject = new JSONObject();
  22. jsonObject.putOpt("type", type);
  23. jsonObject.putOpt("electricType", electricType);
  24. jsonObject.putOpt("data", json);
  25. this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_PERSON,"electric.pushMsg", jsonObject.toJSONString(0));
  26. }
  27. }

4.MqListener监听消息类

  1. package com.xlwy.fkudp.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * mq监听
  7. * @author WHN
  8. * @date 2020/11/9 16:16
  9. */
  10. @Component
  11. public class MqListener {
  12. /**
  13. * 队列1的数据
  14. */
  15. @RabbitListener(queues = RabbitConfig.QUEUE_ELECTRIC)
  16. @RabbitHandler
  17. public void process(String message) {
  18. System.out.println("RabbitMq 监听获取数据 : " + message);
  19. }
  20. }

5.向订阅通道中推送消息

  1. public class AnalyzeFubangService implements Analyze{
  2. @Autowired
  3. private MqProduct mqProduct;
  4. public void analyzeTheHeader(String hex) {
  5. mqProduct.sendElectric("02", "队列1模拟量变动",JSONUtil.parseObj(hex, false, true));
  6. }
  7. }

6.application.yml中配置

  1. spring:
  2. rabbitmq:
  3. host: 服务器ip
  4. port: 服务器端口
  5. username: 用户名
  6. password: 密码
  7. template:
  8. mandatory: true