Producer
提前配置
Topic@Servicepublic class RoccketMqTest {final Logger log = LoggerFactory.getLogger(this.getClass());private final DefaultMQProducer producer =new DefaultMQProducer("Tmc_Topic_Group");@PostConstructpublic void init() throws MQClientException {producer.setNamesrvAddr("ns1:9876;ns2:9876");producer.start();}public DefaultMQProducer getProducer() {return producer;}@PreDestroypublic void destroy() {producer.shutdown();}}
Consumer
配置
消费组@Servicepublic class RocketMqConsumer {protected DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();@PostConstructpublic void init() throws MQClientException {consumer.setNamesrvAddr("ns1:9876;ns2:9876");consumer.setConsumerGroup("Fullinfo_Consumer_Group");consumer.subscribe("fullinfo", "*");consumer.setConsumeMessageBatchMaxSize(128);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {final String topic = msg.getTopic();final byte[] body = msg.getBody();System.out.println(topic + "\t" +new String(body, StandardCharsets.UTF_8) +"\t" + msg.getTags());}} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}@PreDestroypublic void destroy() {System.out.println("===shutdown===");consumer.shutdown();}}
针对Consumer做切面,进行日志记录
public abstract class AbstractTraceMessageListenerConcurrently implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {LoggerContext.init();try {return doConsumeMessage(msgs, context);} finally {LoggerContext.remove();}}/*** 消费信息** @param msgs msgs* @param context 上下文* @return {@link ConsumeConcurrentlyStatus}*/public abstract ConsumeConcurrentlyStatus doConsumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context);}
