1.引入依赖
<dependency>
<groupId>com.xy</groupId>
<artifactId>xy-core-mq-rocket-boot-starter-monitor</artifactId>
<version>${xy-core-mq-rocket-boot-starter-version}</version>
</dependency>
2.添加配置项
rocketmq.enable=true
rocketmq.name-server=${server-addr}:9876
rocketmq.access-channel=LOCAL
rocketmq.producer.group=company-demo-producer
#rocketmq.producer.access-key=
#rocketmq.producer.secret-key=
#rocketmq.producer.enable-msg-trace=true
#rocketmq.producer.customized-trace-topic=
#rocketmq.producer.max-message-size=4194394
rocketmq.producer.compress-message-body-threshold=4096
rocketmq.producer.retry-next-server=false
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=2
rocketmq.producer.send-message-timeout=3000
#是否开启mq消息监听,如果开启,则正常消费消息,如果关系,服务当前节点则不在消费任何消息
#只在启动时生效,服务运行期间修改无效
rocketmq.consumer.enable=true
#rocketmq.consumer.group=${spring.application.name}-consumer
3.定义消息消费者
消费组命名格式:consumerGroup = “topic-applicationName”
即:消费主题-${spring.application.name}�
3.1对象方式接收
@Slf4j
@Component
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)
@RocketMQMessageListener(consumerGroup = "company-demo-consumer", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
public class RocketMessageListener implements RocketMQListener<DemoInfoMQ> {
/**
* 请不要捕获异常信息,否则无法进行消息重新推送
*
* @param message
*/
@Override
public void onMessage(DemoInfoMQ message) {
String jsonString = JSON.toJSONString(message);
System.out.println(jsonString);
log.info("-------------33333333----DemoInfoMQ:{}-------------------", JSON.toJSONString(message));
}
}
泛型直接使用对象接收。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。
3.2MessageExt对象接收
@Slf4j
@Component
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)
@RocketMQMessageListener(consumerGroup = "company-demo-consumer11", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
public class RocketMessageExtListener implements RocketMQListener<MessageExt> {
/**
* 请不要捕获异常信息,否则无法进行消息重新推送
*
* @param message
*/
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String s = new String(body);
DemoInfoMQ demoInfoMQ = JSON.parseObject(s, DemoInfoMQ.class);
String traceId = message.getProperty(CommonConstants.TRACE_ID);
MDC.put(CommonConstants.TRACE_ID, traceId);
log.info("traceId---------22222-----------{}", JSON.toJSONString(demoInfoMQ));
}
}
此方法可以灵活获取请求头信息。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。
4.消息发送
topic命名格式:应用名-事件名
即:${spring.application.name}�-xxx
4.1普通消息
public void sentMessage() {
DemoInfoMQ demoInfoMQ = new DemoInfoMQ();
demoInfoMQ.setInfoId("10010101");
demoInfoMQ.setType("order_cc");
SendResult sendMessage = rocketMQTemplate.syncSend("demoInfoTopic", demoInfoMQ);
}
普通消息直接发送消息对象到指定topic即可。
为保证日志链路的完整性,框架会获取当前线程的日志上下文中的traceId,并设置到header中。若当前没有获取到traceId,会默认自动生成一个uuid的traceId并设置到head中。
4.2设置请求头head的消息
public void send() {
DemoInfoMQ demoInfoMQ = new DemoInfoMQ();
demoInfoMQ.setInfoId("22222");
demoInfoMQ.setType("order_dd");
Message<DemoInfoMQ> mqMessage = MessageBuilder.withPayload(demoInfoMQ)
.setHeader(MessageConst.PROPERTY_KEYS, "20191018")
.setHeader("TAGS", "191018")
.setHeader(MessageConst.PROPERTY_BUYER_ID, "20191018a")
.setHeader(HeadConstants.USER_ID, UserUtils.getUserInfo().getUserId())
.setHeader(HeadConstants.MERCHANT_CODE, UserUtils.getUserInfo().getMerchantCode())
.setHeader("MQ", "user_mq")
.build();
log.info("send----------11111----------{}", JSON.toJSONString(demoInfoMQ));
SendResult sendResult = rocketMQTemplate.syncSend("demoInfoTopic", mqMessage);
}
只是将消息对象用Message做了一层包装,这样可以通过设置请求头信息来做一些业务上需要的判断。同样,框架也自动添加了日志上下文中traceId到header中。
4.3事务消息
@Slf4j
@Component
@RocketMQTransactionListener
public class RocketTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
initTraceId(message);
log.info("【执行本地事务】start execute local tx start");
try {
//如果有多个topic的事物消息,可以在header中设置指定参数来区别不同的业务逻辑
String type = (String) message.getHeaders().get("type");
//do your biz logic
Object payload = message.getPayload();
//1.强转为发送对象
//1.2检查对象中的id是否已经提交
//1.2.1提交 commit
//1.2.2未提交 rollback
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
initTraceId(message);
log.info("【回查本地执行结果】 start check local tx start");
// 回查本地事务执行结果,保证数据最终落盘
return RocketMQLocalTransactionState.COMMIT;
}
private void initTraceId(Message message) {
String traceId = (String) message.getHeaders().get(CommonConstants.TRACE_ID);
if (StringUtils.isEmpty(traceId)) {
traceId = UUID.fastUUID().toString();
}
MDC.put(CommonConstants.TRACE_ID, traceId);
}
}
事务消息,框架暂时未能实现从请求header自动获取并设置日志上下文中的traceId,需要代码显示添加initTraceId(Message message)方法。