1.引入依赖
<dependency><groupId>com.xy</groupId><artifactId>xy-core-mq-rocket-boot-starter-monitor</artifactId><version>${xy-core-mq-rocket-boot-starter-version}</version></dependency>
2.添加配置项
rocketmq.enable=truerocketmq.name-server=${server-addr}:9876rocketmq.access-channel=LOCALrocketmq.producer.group=company-demo-producer#rocketmq.producer.access-key=#rocketmq.producer.secret-key=#rocketmq.producer.enable-msg-trace=true#rocketmq.producer.customized-trace-topic=#rocketmq.producer.max-message-size=4194394rocketmq.producer.compress-message-body-threshold=4096rocketmq.producer.retry-next-server=falserocketmq.producer.retry-times-when-send-failed=2rocketmq.producer.retry-times-when-send-async-failed=2rocketmq.producer.send-message-timeout=3000#是否开启mq消息监听,如果开启,则正常消费消息,如果关系,服务当前节点则不在消费任何消息#只在启动时生效,服务运行期间修改无效rocketmq.consumer.enable=true#rocketmq.consumer.group=${spring.application.name}-consumer
3.定义消息消费者
消费组命名格式:consumerGroup = “topic-applicationName”
即:消费主题-${spring.application.name}�
3.1对象方式接收
@Slf4j@Component@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)@RocketMQMessageListener(consumerGroup = "company-demo-consumer", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)public class RocketMessageListener implements RocketMQListener<DemoInfoMQ> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param message*/@Overridepublic void onMessage(DemoInfoMQ message) {String jsonString = JSON.toJSONString(message);System.out.println(jsonString);log.info("-------------33333333----DemoInfoMQ:{}-------------------", JSON.toJSONString(message));}}
泛型直接使用对象接收。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。
3.2MessageExt对象接收
@Slf4j@Component@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "enable", matchIfMissing = true)@RocketMQMessageListener(consumerGroup = "company-demo-consumer11", topic = "demoInfoTopic", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)public class RocketMessageExtListener implements RocketMQListener<MessageExt> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param message*/@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String s = new String(body);DemoInfoMQ demoInfoMQ = JSON.parseObject(s, DemoInfoMQ.class);String traceId = message.getProperty(CommonConstants.TRACE_ID);MDC.put(CommonConstants.TRACE_ID, traceId);log.info("traceId---------22222-----------{}", JSON.toJSONString(demoInfoMQ));}}
此方法可以灵活获取请求头信息。此过程中,已将消息生产者链路traceId写到当前的日志上下文中。
4.消息发送
topic命名格式:应用名-事件名
即:${spring.application.name}�-xxx
4.1普通消息
public void sentMessage() {DemoInfoMQ demoInfoMQ = new DemoInfoMQ();demoInfoMQ.setInfoId("10010101");demoInfoMQ.setType("order_cc");SendResult sendMessage = rocketMQTemplate.syncSend("demoInfoTopic", demoInfoMQ);}
普通消息直接发送消息对象到指定topic即可。
为保证日志链路的完整性,框架会获取当前线程的日志上下文中的traceId,并设置到header中。若当前没有获取到traceId,会默认自动生成一个uuid的traceId并设置到head中。
4.2设置请求头head的消息
public void send() {DemoInfoMQ demoInfoMQ = new DemoInfoMQ();demoInfoMQ.setInfoId("22222");demoInfoMQ.setType("order_dd");Message<DemoInfoMQ> mqMessage = MessageBuilder.withPayload(demoInfoMQ).setHeader(MessageConst.PROPERTY_KEYS, "20191018").setHeader("TAGS", "191018").setHeader(MessageConst.PROPERTY_BUYER_ID, "20191018a").setHeader(HeadConstants.USER_ID, UserUtils.getUserInfo().getUserId()).setHeader(HeadConstants.MERCHANT_CODE, UserUtils.getUserInfo().getMerchantCode()).setHeader("MQ", "user_mq").build();log.info("send----------11111----------{}", JSON.toJSONString(demoInfoMQ));SendResult sendResult = rocketMQTemplate.syncSend("demoInfoTopic", mqMessage);}
只是将消息对象用Message做了一层包装,这样可以通过设置请求头信息来做一些业务上需要的判断。同样,框架也自动添加了日志上下文中traceId到header中。
4.3事务消息
@Slf4j@Component@RocketMQTransactionListenerpublic class RocketTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {initTraceId(message);log.info("【执行本地事务】start execute local tx start");try {//如果有多个topic的事物消息,可以在header中设置指定参数来区别不同的业务逻辑String type = (String) message.getHeaders().get("type");//do your biz logicObject payload = message.getPayload();//1.强转为发送对象//1.2检查对象中的id是否已经提交//1.2.1提交 commit//1.2.2未提交 rollbackreturn RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {initTraceId(message);log.info("【回查本地执行结果】 start check local tx start");// 回查本地事务执行结果,保证数据最终落盘return RocketMQLocalTransactionState.COMMIT;}private void initTraceId(Message message) {String traceId = (String) message.getHeaders().get(CommonConstants.TRACE_ID);if (StringUtils.isEmpty(traceId)) {traceId = UUID.fastUUID().toString();}MDC.put(CommonConstants.TRACE_ID, traceId);}}
事务消息,框架暂时未能实现从请求header自动获取并设置日志上下文中的traceId,需要代码显示添加initTraceId(Message message)方法。
