一、生产者
1.1 yml配置文件
生产者和消费者需要在同一个name-server,group,customized-trace-topic
rocketmq:
name-server: xxx.xx.xxx.xxx:xxxx
producer:
group: pybasemetadata-dev
customized-trace-topic: data-analysis-dev
send-message-timeout: 5000
retry-times-when-send-failed: 3
consumer:
group: pybasemetadata-dev
topic: data-analysis-dev
consume-thread-min: 10
consume-thread-max: 20
1.2 配置
@Slf4j
@Data
@Configuration
public class MqProducer {
@Value( "${rocketmq.producer.group}" )
private String group;
@Value( "${rocketmq.nameserver}" )
private String nameServer;
@Value( "${rocketmq.producer.sendmessagetimeout}" )
private Integer sendMessageTimeout;
@Value( "${rocketmq.producer.retrytimeswhensendfailed}" )
private Integer retryTimesWhenSendFailed;
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info( "defaultProducer 正在创建---------------------------------------" );
DefaultMQProducer producer = new DefaultMQProducer( group );
producer.setNamesrvAddr( nameServer );
producer.setVipChannelEnabled( false );
producer.setSendMsgTimeout( sendMessageTimeout );
producer.setRetryTimesWhenSendAsyncFailed( retryTimesWhenSendFailed );
producer.start();
log.info( "rocketmq producer server 开启成功----------------------------------" );
return producer;
}
}
1.3 Tag常量类(用于指定topic和tag)
用于区分消息类型
@Component
public class MqTag {
public static String TOPIC;
public static final String UP_EXCEL = "up-excel";
public static final String SAVE_DATA = "save-data";
public static final String QUALITY_SECOND = "quality_second";
@Value("${rocketmq.consumer.topic}")
public void setTOPIC(String TOPIC) {
MqTag.TOPIC = TOPIC;
}
}
1.4 消息实体类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QualitySecondMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 举例,发送id(可发送更多信息)
*/
private String id;
}
1.5 发送消息代码
省略其他逻辑代码
// 封装消息实体类
QualitySecondMessage qualitySecondMessage = QualitySecondMessage.builder().id(1).build()
// 发送消息(指定topic和tag,消费者也可以指定从哪个topic和tag接收消息)
Message sendMsg = new Message(MqTag.TOPIC, MqTag.QUALITY_SECOND, JSON.toJSONString(qualitySecondMessage).getBytes());
SendResult sendResult = null;
try {
sendResult = defaultMQProducer.send(sendMsg);
log.info("消息发送成功-{}", sendResult);
} catch (Exception e) {
return ResponseVo.failed("上传Mq消息异常!");
}
if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
return ResponseVo.failed("上传消息异常!");
}
二、消费者(包含策略模式 )
2.1 yml配置文件
同生产者
rocketmq:
name-server: 172.16.230.197:9876;172.16.230.198:9876;172.16.230.199:9876
producer:
group: pybasemetadata-dev
customized-trace-topic: data-analysis-dev
send-message-timeout: 5000
retry-times-when-send-failed: 3
consumer:
group: pybasemetadata-dev
topic: data-analysis-dev
consume-thread-min: 10
consume-thread-max: 20
2.2 配置
@Data
@Slf4j
@Configuration
public class MqConsumer {
@Value("${rocketmq.consumer.group}")
private String group;
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.nameserver}")
private String nameServer;
@Value("${rocketmq.consumer.consumethreadmin}")
private Integer consumeThreadMin;
@Value("${rocketmq.consumer.consumethreadmax}")
private Integer consumeThreadMax;
private Integer consumeMessageBatchMaxSize;
@Value("${spring.profiles.active}")
private String dev;
@Autowired
private MqConsumeListener consumeMsgListenerProcessor;
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info( "defaultConsumer 正在创建---------------------------------------" );
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( group );
consumer.setNamesrvAddr( nameServer );
// 设置监听
consumer.registerMessageListener( consumeMsgListenerProcessor );
consumer.setConsumeThreadMin( consumeThreadMin );
consumer.setConsumeThreadMax( consumeThreadMax );
consumer.setMessageModel( MessageModel.CLUSTERING );
consumer.setMaxReconsumeTimes( 10 );
// 设置consumer消费 位点
consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
consumer.subscribe( topic, "*" );
consumer.start();
log.info( "consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", group, topic, nameServer );
} catch (MQClientException e) {
log.error( "consumer 创建失败!" );
}
return consumer;
}
}
2.3 Tag常量类(用于指定topic和tag)
同生产者
@Component
public class MqTag {
public static String TOPIC;
public static final String UP_EXCEL = "up-excel";
public static final String SAVE_DATA = "save-data";
public static final String QUALITY_SECOND = "quality_second";
@Value("${rocketmq.consumer.topic}")
public void setTOPIC(String TOPIC) {
MqTag.TOPIC = TOPIC;
}
}
2.4 消息实体类
同生产者
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QualitySecondMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 举例,发送id(可发送更多信息)
*/
private String id;
}
2.5 策略Service
@Component
public class MqTagService {
private HashMap<String, MqTagStrategy> tagStrategyHashMap = new HashMap<>();
public MqTagService(List<MqTagStrategy> tagStrategyList) {
for (MqTagStrategy strategy : tagStrategyList) {
this.tagStrategyHashMap.put( strategy.tag(), strategy );
}
}
public MqTagStrategy getTagStrategy(String tag) {
return tagStrategyHashMap.get( tag );
}
}
2.6 消费者监听器
@Component
@Slf4j
public class MqConsumeListener implements MessageListenerConcurrently {
@Autowired
private MqTagService mqTagService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty( msgList )) {
log.info( "MQ接收消息为空,直接返回成功" );
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get( 0 );
try {
String tags = messageExt.getTags();
messageExt.getBody();
String body = new String( messageExt.getBody(), StandardCharsets.UTF_8 );
// 根据tags 编写策略模式
MqTagStrategy tagStrategy = mqTagService.getTagStrategy( tags );
if (tagStrategy != null) {
return tagStrategy.consume( body );
}
} catch (Exception e) {
e.printStackTrace();
log.error( "获取MQ消息内容异常{}", e.getCause().getMessage() );
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
2.7 接受消息,消费消息代码(实现策略接口)
@Slf4j
@Component
public class QualitySecondTagStrategy implements MqTagStrategy {
@Autowired
private QualitySecondMapper secondMapper;
@Override
public String tag() {
return MqTag.QUALITY_SECOND;
}
@Override
public ConsumeConcurrentlyStatus consume(String message) {
log.info("接收到二层质控执行消息:{}", message);
if (!message.startsWith("{")) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 将获取的消息进行封装
QualitySecondMessage secondMessage = JSONObject.parseObject(message, QualitySecondMessage.class);
HashMap<String, Object> objData = secondMessage.getData();
// 封装为K,V都为String的map,方便取值
HashMap<String, String> data = new HashMap<>(32);
for (Map.Entry<String, Object> entry : objData.entrySet()) {
data.put(entry.getKey(), entry.getValue() + "");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}