一、生产者

1.1 yml配置文件

生产者和消费者需要在同一个name-server,group,customized-trace-topic

  1. rocketmq:
  2. name-server: xxx.xx.xxx.xxx:xxxx
  3. producer:
  4. group: pybasemetadata-dev
  5. customized-trace-topic: data-analysis-dev
  6. send-message-timeout: 5000
  7. retry-times-when-send-failed: 3
  8. consumer:
  9. group: pybasemetadata-dev
  10. topic: data-analysis-dev
  11. consume-thread-min: 10
  12. consume-thread-max: 20

1.2 配置

  1. @Slf4j
  2. @Data
  3. @Configuration
  4. public class MqProducer {
  5. @Value( "${rocketmq.producer.group}" )
  6. private String group;
  7. @Value( "${rocketmq.nameserver}" )
  8. private String nameServer;
  9. @Value( "${rocketmq.producer.sendmessagetimeout}" )
  10. private Integer sendMessageTimeout;
  11. @Value( "${rocketmq.producer.retrytimeswhensendfailed}" )
  12. private Integer retryTimesWhenSendFailed;
  13. @Bean
  14. public DefaultMQProducer defaultProducer() throws MQClientException {
  15. log.info( "defaultProducer 正在创建---------------------------------------" );
  16. DefaultMQProducer producer = new DefaultMQProducer( group );
  17. producer.setNamesrvAddr( nameServer );
  18. producer.setVipChannelEnabled( false );
  19. producer.setSendMsgTimeout( sendMessageTimeout );
  20. producer.setRetryTimesWhenSendAsyncFailed( retryTimesWhenSendFailed );
  21. producer.start();
  22. log.info( "rocketmq producer server 开启成功----------------------------------" );
  23. return producer;
  24. }
  25. }

1.3 Tag常量类(用于指定topic和tag)

用于区分消息类型

  1. @Component
  2. public class MqTag {
  3. public static String TOPIC;
  4. public static final String UP_EXCEL = "up-excel";
  5. public static final String SAVE_DATA = "save-data";
  6. public static final String QUALITY_SECOND = "quality_second";
  7. @Value("${rocketmq.consumer.topic}")
  8. public void setTOPIC(String TOPIC) {
  9. MqTag.TOPIC = TOPIC;
  10. }
  11. }

1.4 消息实体类

  1. @Data
  2. @Builder
  3. @NoArgsConstructor
  4. @AllArgsConstructor
  5. public class QualitySecondMessage implements Serializable {
  6. private static final long serialVersionUID = 1L;
  7. /**
  8. * 举例,发送id(可发送更多信息)
  9. */
  10. private String id;
  11. }

1.5 发送消息代码

省略其他逻辑代码

  1. // 封装消息实体类
  2. QualitySecondMessage qualitySecondMessage = QualitySecondMessage.builder().id(1).build()
  3. // 发送消息(指定topic和tag,消费者也可以指定从哪个topic和tag接收消息)
  4. Message sendMsg = new Message(MqTag.TOPIC, MqTag.QUALITY_SECOND, JSON.toJSONString(qualitySecondMessage).getBytes());
  5. SendResult sendResult = null;
  6. try {
  7. sendResult = defaultMQProducer.send(sendMsg);
  8. log.info("消息发送成功-{}", sendResult);
  9. } catch (Exception e) {
  10. return ResponseVo.failed("上传Mq消息异常!");
  11. }
  12. if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
  13. return ResponseVo.failed("上传消息异常!");
  14. }

二、消费者(包含策略模式 )

2.1 yml配置文件

同生产者

  1. rocketmq:
  2. name-server: 172.16.230.197:9876;172.16.230.198:9876;172.16.230.199:9876
  3. producer:
  4. group: pybasemetadata-dev
  5. customized-trace-topic: data-analysis-dev
  6. send-message-timeout: 5000
  7. retry-times-when-send-failed: 3
  8. consumer:
  9. group: pybasemetadata-dev
  10. topic: data-analysis-dev
  11. consume-thread-min: 10
  12. consume-thread-max: 20

2.2 配置

  1. @Data
  2. @Slf4j
  3. @Configuration
  4. public class MqConsumer {
  5. @Value("${rocketmq.consumer.group}")
  6. private String group;
  7. @Value("${rocketmq.consumer.topic}")
  8. private String topic;
  9. @Value("${rocketmq.nameserver}")
  10. private String nameServer;
  11. @Value("${rocketmq.consumer.consumethreadmin}")
  12. private Integer consumeThreadMin;
  13. @Value("${rocketmq.consumer.consumethreadmax}")
  14. private Integer consumeThreadMax;
  15. private Integer consumeMessageBatchMaxSize;
  16. @Value("${spring.profiles.active}")
  17. private String dev;
  18. @Autowired
  19. private MqConsumeListener consumeMsgListenerProcessor;
  20. @Bean
  21. public DefaultMQPushConsumer defaultConsumer() {
  22. log.info( "defaultConsumer 正在创建---------------------------------------" );
  23. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( group );
  24. consumer.setNamesrvAddr( nameServer );
  25. // 设置监听
  26. consumer.registerMessageListener( consumeMsgListenerProcessor );
  27. consumer.setConsumeThreadMin( consumeThreadMin );
  28. consumer.setConsumeThreadMax( consumeThreadMax );
  29. consumer.setMessageModel( MessageModel.CLUSTERING );
  30. consumer.setMaxReconsumeTimes( 10 );
  31. // 设置consumer消费 位点
  32. consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );
  33. try {
  34. // 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
  35. consumer.subscribe( topic, "*" );
  36. consumer.start();
  37. log.info( "consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", group, topic, nameServer );
  38. } catch (MQClientException e) {
  39. log.error( "consumer 创建失败!" );
  40. }
  41. return consumer;
  42. }
  43. }

2.3 Tag常量类(用于指定topic和tag)

同生产者

  1. @Component
  2. public class MqTag {
  3. public static String TOPIC;
  4. public static final String UP_EXCEL = "up-excel";
  5. public static final String SAVE_DATA = "save-data";
  6. public static final String QUALITY_SECOND = "quality_second";
  7. @Value("${rocketmq.consumer.topic}")
  8. public void setTOPIC(String TOPIC) {
  9. MqTag.TOPIC = TOPIC;
  10. }
  11. }

2.4 消息实体类

同生产者

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QualitySecondMessage implements Serializable {
    private static final long serialVersionUID = 1L;
    /**
     * 举例,发送id(可发送更多信息)
     */
    private String id;
}

2.5 策略Service

@Component
public class MqTagService {

    private HashMap<String, MqTagStrategy> tagStrategyHashMap = new HashMap<>();

    public MqTagService(List<MqTagStrategy> tagStrategyList) {
        for (MqTagStrategy strategy : tagStrategyList) {
            this.tagStrategyHashMap.put( strategy.tag(), strategy );
        }
    }

    public MqTagStrategy getTagStrategy(String tag) {
        return tagStrategyHashMap.get( tag );
    }
}

2.6 消费者监听器

@Component
@Slf4j
public class MqConsumeListener implements MessageListenerConcurrently {

    @Autowired
    private MqTagService mqTagService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty( msgList )) {
            log.info( "MQ接收消息为空,直接返回成功" );
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgList.get( 0 );
        try {
            String tags = messageExt.getTags();
            messageExt.getBody();
            String body = new String( messageExt.getBody(), StandardCharsets.UTF_8 );
            // 根据tags  编写策略模式
            MqTagStrategy tagStrategy = mqTagService.getTagStrategy( tags );
            if (tagStrategy != null) {
                return tagStrategy.consume( body );
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error( "获取MQ消息内容异常{}", e.getCause().getMessage() );
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

2.7 接受消息,消费消息代码(实现策略接口)

@Slf4j
@Component
public class QualitySecondTagStrategy implements MqTagStrategy {

    @Autowired
    private QualitySecondMapper secondMapper;

    @Override
    public String tag() {
        return MqTag.QUALITY_SECOND;
    }

    @Override
    public ConsumeConcurrentlyStatus consume(String message) {
        log.info("接收到二层质控执行消息:{}", message);
        if (!message.startsWith("{")) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // 将获取的消息进行封装
        QualitySecondMessage secondMessage = JSONObject.parseObject(message, QualitySecondMessage.class);
        HashMap<String, Object> objData = secondMessage.getData();
        // 封装为K,V都为String的map,方便取值
        HashMap<String, String> data = new HashMap<>(32);
        for (Map.Entry<String, Object> entry : objData.entrySet()) {
            data.put(entry.getKey(), entry.getValue() + "");
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}