一、生产者
1.1 yml配置文件
生产者和消费者需要在同一个name-server,group,customized-trace-topic
rocketmq:name-server: xxx.xx.xxx.xxx:xxxxproducer:group: pybasemetadata-devcustomized-trace-topic: data-analysis-devsend-message-timeout: 5000retry-times-when-send-failed: 3consumer:group: pybasemetadata-devtopic: data-analysis-devconsume-thread-min: 10consume-thread-max: 20
1.2 配置
@Slf4j@Data@Configurationpublic class MqProducer {@Value( "${rocketmq.producer.group}" )private String group;@Value( "${rocketmq.nameserver}" )private String nameServer;@Value( "${rocketmq.producer.sendmessagetimeout}" )private Integer sendMessageTimeout;@Value( "${rocketmq.producer.retrytimeswhensendfailed}" )private Integer retryTimesWhenSendFailed;@Beanpublic DefaultMQProducer defaultProducer() throws MQClientException {log.info( "defaultProducer 正在创建---------------------------------------" );DefaultMQProducer producer = new DefaultMQProducer( group );producer.setNamesrvAddr( nameServer );producer.setVipChannelEnabled( false );producer.setSendMsgTimeout( sendMessageTimeout );producer.setRetryTimesWhenSendAsyncFailed( retryTimesWhenSendFailed );producer.start();log.info( "rocketmq producer server 开启成功----------------------------------" );return producer;}}
1.3 Tag常量类(用于指定topic和tag)
用于区分消息类型
@Componentpublic class MqTag {public static String TOPIC;public static final String UP_EXCEL = "up-excel";public static final String SAVE_DATA = "save-data";public static final String QUALITY_SECOND = "quality_second";@Value("${rocketmq.consumer.topic}")public void setTOPIC(String TOPIC) {MqTag.TOPIC = TOPIC;}}
1.4 消息实体类
@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class QualitySecondMessage implements Serializable {private static final long serialVersionUID = 1L;/*** 举例,发送id(可发送更多信息)*/private String id;}
1.5 发送消息代码
省略其他逻辑代码
// 封装消息实体类QualitySecondMessage qualitySecondMessage = QualitySecondMessage.builder().id(1).build()// 发送消息(指定topic和tag,消费者也可以指定从哪个topic和tag接收消息)Message sendMsg = new Message(MqTag.TOPIC, MqTag.QUALITY_SECOND, JSON.toJSONString(qualitySecondMessage).getBytes());SendResult sendResult = null;try {sendResult = defaultMQProducer.send(sendMsg);log.info("消息发送成功-{}", sendResult);} catch (Exception e) {return ResponseVo.failed("上传Mq消息异常!");}if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {return ResponseVo.failed("上传消息异常!");}
二、消费者(包含策略模式 )
2.1 yml配置文件
同生产者
rocketmq:name-server: 172.16.230.197:9876;172.16.230.198:9876;172.16.230.199:9876producer:group: pybasemetadata-devcustomized-trace-topic: data-analysis-devsend-message-timeout: 5000retry-times-when-send-failed: 3consumer:group: pybasemetadata-devtopic: data-analysis-devconsume-thread-min: 10consume-thread-max: 20
2.2 配置
@Data@Slf4j@Configurationpublic class MqConsumer {@Value("${rocketmq.consumer.group}")private String group;@Value("${rocketmq.consumer.topic}")private String topic;@Value("${rocketmq.nameserver}")private String nameServer;@Value("${rocketmq.consumer.consumethreadmin}")private Integer consumeThreadMin;@Value("${rocketmq.consumer.consumethreadmax}")private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;@Value("${spring.profiles.active}")private String dev;@Autowiredprivate MqConsumeListener consumeMsgListenerProcessor;@Beanpublic DefaultMQPushConsumer defaultConsumer() {log.info( "defaultConsumer 正在创建---------------------------------------" );DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( group );consumer.setNamesrvAddr( nameServer );// 设置监听consumer.registerMessageListener( consumeMsgListenerProcessor );consumer.setConsumeThreadMin( consumeThreadMin );consumer.setConsumeThreadMax( consumeThreadMax );consumer.setMessageModel( MessageModel.CLUSTERING );consumer.setMaxReconsumeTimes( 10 );// 设置consumer消费 位点consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );try {// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,consumer.subscribe( topic, "*" );consumer.start();log.info( "consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", group, topic, nameServer );} catch (MQClientException e) {log.error( "consumer 创建失败!" );}return consumer;}}
2.3 Tag常量类(用于指定topic和tag)
同生产者
@Componentpublic class MqTag {public static String TOPIC;public static final String UP_EXCEL = "up-excel";public static final String SAVE_DATA = "save-data";public static final String QUALITY_SECOND = "quality_second";@Value("${rocketmq.consumer.topic}")public void setTOPIC(String TOPIC) {MqTag.TOPIC = TOPIC;}}
2.4 消息实体类
同生产者
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QualitySecondMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 举例,发送id(可发送更多信息)
*/
private String id;
}
2.5 策略Service
@Component
public class MqTagService {
private HashMap<String, MqTagStrategy> tagStrategyHashMap = new HashMap<>();
public MqTagService(List<MqTagStrategy> tagStrategyList) {
for (MqTagStrategy strategy : tagStrategyList) {
this.tagStrategyHashMap.put( strategy.tag(), strategy );
}
}
public MqTagStrategy getTagStrategy(String tag) {
return tagStrategyHashMap.get( tag );
}
}
2.6 消费者监听器
@Component
@Slf4j
public class MqConsumeListener implements MessageListenerConcurrently {
@Autowired
private MqTagService mqTagService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty( msgList )) {
log.info( "MQ接收消息为空,直接返回成功" );
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get( 0 );
try {
String tags = messageExt.getTags();
messageExt.getBody();
String body = new String( messageExt.getBody(), StandardCharsets.UTF_8 );
// 根据tags 编写策略模式
MqTagStrategy tagStrategy = mqTagService.getTagStrategy( tags );
if (tagStrategy != null) {
return tagStrategy.consume( body );
}
} catch (Exception e) {
e.printStackTrace();
log.error( "获取MQ消息内容异常{}", e.getCause().getMessage() );
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
2.7 接受消息,消费消息代码(实现策略接口)
@Slf4j
@Component
public class QualitySecondTagStrategy implements MqTagStrategy {
@Autowired
private QualitySecondMapper secondMapper;
@Override
public String tag() {
return MqTag.QUALITY_SECOND;
}
@Override
public ConsumeConcurrentlyStatus consume(String message) {
log.info("接收到二层质控执行消息:{}", message);
if (!message.startsWith("{")) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 将获取的消息进行封装
QualitySecondMessage secondMessage = JSONObject.parseObject(message, QualitySecondMessage.class);
HashMap<String, Object> objData = secondMessage.getData();
// 封装为K,V都为String的map,方便取值
HashMap<String, String> data = new HashMap<>(32);
for (Map.Entry<String, Object> entry : objData.entrySet()) {
data.put(entry.getKey(), entry.getValue() + "");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
