消息生产者

依赖引入

  1. <dependency>
  2. <groupId>com.cjkj</groupId>
  3. <artifactId>cjkj-mq-spring-boot-starter</artifactId>
  4. <version>1.5.0</version>
  5. </dependency>

配置文件

配置文件中配置需要用到的所有MQ信息[队列,交换机,路由key],支持配置多个

  • 配置的时候注意{}对象中的key必须写正确,交换机对应exchange, 路由键对应routingKey, 队列对应queueName

    1. #业务代码, 交换机, 路由键, 消息队列
    2. queues:
    3. messageQueues: [{bizCode: 101, exchange: modules_message_service, routingKey: message.push, queueName: modules.message.push}]

    定义MQ消息信息

  • 定义交换机和对应路由key封装的常量比如:PUSH_MESSAGE("modules_message_service", "message.push");

  • 定义业务对应的队列,这里以激光推送队列为例:在内部类MessageQueueConstants中添加public static final String QUEUE_MESSAGE_PUSH = "modules.message.push";

    1. @Slf4j
    2. public enum EnumMessageTopics {
    3. /**
    4. * 交换机和对应路由key封装的常量
    5. */
    6. PUSH_MESSAGE("modules_message_service", "message.push");
    7. /**
    8. * 交换机
    9. */
    10. private final String systemCode;
    11. /**
    12. * 主题
    13. */
    14. private final String topic;
    15. EnumMessageTopics(final String systemCode, final String topic) {
    16. this.systemCode = systemCode;
    17. this.topic = topic;
    18. }
    19. public String getFullTopicName() {
    20. return systemCode + ":" + topic;
    21. }
    22. public String getSystemCode() {
    23. return systemCode;
    24. }
    25. public String getTopic() {
    26. return topic;
    27. }
    28. /**
    29. * 平台 Queues
    30. */
    31. public static class MessageQueueConstants {
    32. /**
    33. * 激光推送信息队列
    34. */
    35. public static final String QUEUE_MESSAGE_PUSH = "modules.message.push";
    36. }
    37. }

    发送消息

    ```java @Autowired private MessageSender messageSender;

@Override public void pushMessage(PushMessageReq pushMessageReq) { final List pushMessageEntityList = getPushMessage(pushMessageReq); pushMessageEntityList.forEach(pushMessageEntity -> { pushMessageEntity.setId(SnowflakeIdWorker.nextId()); pushMessageDao.insertApikeyMessageInfo(pushMessageEntity); messageSender.sendRabbit(pushMessageEntity, EnumMessageTopics.PUSH_MESSAGE.getSystemCode(), EnumMessageTopics.PUSH_MESSAGE.getTopic()); }); }

  1. <a name="qXSEt"></a>
  2. ## 消费者
  3. <a name="noNmh"></a>
  4. ## 定义回调方法
  5. - 回调方法上添加@RabbitHandler和@RabbitListener注解
  6. - 在@RabbitListener注解中指定队列
  7. ```java
  8. @RabbitHandler
  9. @RabbitListener(queues = EnumMessageTopics.MessageQueueConstants.QUEUE_MESSAGE_PUSH)
  10. private void pushMessageFunc(Message message, Channel channel) {
  11. TEASPOON.execute(() -> {
  12. final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  13. final String topic = message.getMessageProperties().getReceivedRoutingKey();
  14. String jsonMessage = new String(message.getBody(), StandardCharsets.UTF_8);
  15. log.info(dtf2.format(LocalDateTime.now()) + "收到 Topic: [{}],消息 [{}]", topic, jsonMessage);
  16. final PushMessageEntity pushMessageEntity;
  17. try {
  18. pushMessageEntity = JsonUtils.fromJson(jsonMessage, PushMessageEntity.class);
  19. // 手动ack应答
  20. // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
  21. // 否则消息服务器以为这条消息没处理掉 后续还会在发,true确认所有消费者获得的消息
  22. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  23. log.info("消息消费成功:id:{}", message.getMessageProperties().getDeliveryTag());
  24. if (pushMessageEntity != null) {
  25. this.sendMessage(pushMessageEntity);
  26. }
  27. } catch (Exception e) {
  28. log.error("消息处理失败:id:{}", message.getMessageProperties().getDeliveryTag());
  29. }
  30. });
  31. }