Producer
提前配置
Topic
@Service
public class RoccketMqTest {
final Logger log = LoggerFactory.getLogger(this.getClass());
private final DefaultMQProducer producer =
new DefaultMQProducer("Tmc_Topic_Group");
@PostConstruct
public void init() throws MQClientException {
producer.setNamesrvAddr("ns1:9876;ns2:9876");
producer.start();
}
public DefaultMQProducer getProducer() {
return producer;
}
@PreDestroy
public void destroy() {
producer.shutdown();
}
}
Consumer
配置
消费组
@Service
public class RocketMqConsumer {
protected DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
@PostConstruct
public void init() throws MQClientException {
consumer.setNamesrvAddr("ns1:9876;ns2:9876");
consumer.setConsumerGroup("Fullinfo_Consumer_Group");
consumer.subscribe("fullinfo", "*");
consumer.setConsumeMessageBatchMaxSize(128);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
final String topic = msg.getTopic();
final byte[] body = msg.getBody();
System.out.println(topic + "\t" +
new String(body, StandardCharsets.UTF_8) +
"\t" + msg.getTags());
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
@PreDestroy
public void destroy() {
System.out.println("===shutdown===");
consumer.shutdown();
}
}
针对Consumer做切面,进行日志记录
public abstract class AbstractTraceMessageListenerConcurrently implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
LoggerContext.init();
try {
return doConsumeMessage(msgs, context);
} finally {
LoggerContext.remove();
}
}
/**
* 消费信息
*
* @param msgs msgs
* @param context 上下文
* @return {@link ConsumeConcurrentlyStatus}
*/
public abstract ConsumeConcurrentlyStatus doConsumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
}