Producer

  • 提前配置 Topic

    1. @Service
    2. public class RoccketMqTest {
    3. final Logger log = LoggerFactory.getLogger(this.getClass());
    4. private final DefaultMQProducer producer =
    5. new DefaultMQProducer("Tmc_Topic_Group");
    6. @PostConstruct
    7. public void init() throws MQClientException {
    8. producer.setNamesrvAddr("ns1:9876;ns2:9876");
    9. producer.start();
    10. }
    11. public DefaultMQProducer getProducer() {
    12. return producer;
    13. }
    14. @PreDestroy
    15. public void destroy() {
    16. producer.shutdown();
    17. }
    18. }

Consumer

  • 配置 消费组

    1. @Service
    2. public class RocketMqConsumer {
    3. protected DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    4. @PostConstruct
    5. public void init() throws MQClientException {
    6. consumer.setNamesrvAddr("ns1:9876;ns2:9876");
    7. consumer.setConsumerGroup("Fullinfo_Consumer_Group");
    8. consumer.subscribe("fullinfo", "*");
    9. consumer.setConsumeMessageBatchMaxSize(128);
    10. consumer.registerMessageListener(new MessageListenerConcurrently() {
    11. @Override
    12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    13. try {
    14. for (MessageExt msg : msgs) {
    15. final String topic = msg.getTopic();
    16. final byte[] body = msg.getBody();
    17. System.out.println(topic + "\t" +
    18. new String(body, StandardCharsets.UTF_8) +
    19. "\t" + msg.getTags());
    20. }
    21. } catch (Exception e) {
    22. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    23. }
    24. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    25. }
    26. });
    27. consumer.start();
    28. }
    29. @PreDestroy
    30. public void destroy() {
    31. System.out.println("===shutdown===");
    32. consumer.shutdown();
    33. }
    34. }

    针对Consumer做切面,进行日志记录

    1. public abstract class AbstractTraceMessageListenerConcurrently implements MessageListenerConcurrently {
    2. @Override
    3. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    4. LoggerContext.init();
    5. try {
    6. return doConsumeMessage(msgs, context);
    7. } finally {
    8. LoggerContext.remove();
    9. }
    10. }
    11. /**
    12. * 消费信息
    13. *
    14. * @param msgs msgs
    15. * @param context 上下文
    16. * @return {@link ConsumeConcurrentlyStatus}
    17. */
    18. public abstract ConsumeConcurrentlyStatus doConsumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
    19. }