RocketMQ 09 SpringBoot 整合

RocketMQ 09 SpringBoot 整合

目前还没有官方的starter

pom.xml

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-common</artifactId>
  4. <version>4.6.1</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-client</artifactId>
  10. <version>4.6.1</version>
  11. </dependency>

Producer配置

Config配置类

用于在系统启动时初始化producer参数并启动

  1. package com.mashibing.rmq.controller;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. public class MQConfig {
  11. public static final Logger LOGGER = LoggerFactory.getLogger(MQConfig.class);
  12. @Value("${rocketmq.producer.groupName}")
  13. private String groupName;
  14. @Value("${rocketmq.producer.namesrvAddr}")
  15. private String namesrvAddr;
  16. @Bean
  17. public DefaultMQProducer getRocketMQProducer() {
  18. DefaultMQProducer producer;
  19. producer = new DefaultMQProducer(this.groupName);
  20. producer.setNamesrvAddr(this.namesrvAddr);
  21. try {
  22. producer.start();
  23. System.out.println("start....");
  24. LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]", this.groupName,
  25. this.namesrvAddr));
  26. } catch (MQClientException e) {
  27. LOGGER.error(String.format("producer is error {}", e.getMessage(), e));
  28. }
  29. return producer;
  30. }
  31. }

Service消息发送类

package com.mashibing.rmq.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQService {
    @Autowired
    DefaultMQProducer producer;


    public Object sendMsg(String string) {

        for (int i = 0; i < 1; i++) {
            Message message = new Message("tpk02", "xx".getBytes());

            try {
                return producer.send(message);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } 
        }
        return null;
    }
}

配置文件

spring.application.name=mq01
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}
server.port=8081

Consumer配置

Config配置类

package com.mashibing.rmq.controller;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {


    public static final Logger logger = LoggerFactory.getLogger(MQConfig.class);

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.topics}")
    private String topics;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe(topics, "*");

        consumer.registerMessageListener(new MyMessageListener() );
        consumer.start();

        return consumer;
    }
}

消息处理类

package com.mashibing.rmq.controller;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class MyMessageListener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("来啦!!22!");
            for (MessageExt msg : msgs) {

                System.out.println(new String(msg.getBody()));;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
}

配置文件

spring.application.name=mq02
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}
rocketmq.consumer.topics=tpk02

%23%20RocketMQ%2009%20SpringBoot%20%E6%95%B4%E5%90%88%0A%0A%E7%9B%AE%E5%89%8D%E8%BF%98%E6%B2%A1%E6%9C%89%E5%AE%98%E6%96%B9%E7%9A%84starter%0A%0A%23%23%23%20pom.xml%0A%0A%60%60%60%0A%09%09%3Cdependency%3E%0A%09%09%20%20%20%20%3CgroupId%3Eorg.apache.rocketmq%3C%2FgroupId%3E%0A%09%09%20%20%20%20%3CartifactId%3Erocketmq-common%3C%2FartifactId%3E%0A%09%09%20%20%20%20%3Cversion%3E4.6.1%3C%2Fversion%3E%0A%09%09%3C%2Fdependency%3E%0A%09%09%3C!—%20https%3A%2F%2Fmvnrepository.com%2Fartifact%2Forg.apache.rocketmq%2Frocketmq-client%20—%3E%0A%09%09%3Cdependency%3E%0A%09%09%20%20%20%20%3CgroupId%3Eorg.apache.rocketmq%3C%2FgroupId%3E%0A%09%09%20%20%20%20%3CartifactId%3Erocketmq-client%3C%2FartifactId%3E%0A%09%09%20%20%20%20%3Cversion%3E4.6.1%3C%2Fversion%3E%0A%09%09%3C%2Fdependency%3E%0A%60%60%60%0A%0A%0A%0A%23%23%20Producer%E9%85%8D%E7%BD%AE%0A%0A%23%23%23%20Config%E9%85%8D%E7%BD%AE%E7%B1%BB%0A%0A%E7%94%A8%E4%BA%8E%E5%9C%A8%E7%B3%BB%E7%BB%9F%E5%90%AF%E5%8A%A8%E6%97%B6%E5%88%9D%E5%A7%8B%E5%8C%96producer%E5%8F%82%E6%95%B0%E5%B9%B6%E5%90%AF%E5%8A%A8%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0Aimport%20org.apache.rocketmq.client.exception.MQClientException%3B%0Aimport%20org.apache.rocketmq.client.producer.DefaultMQProducer%3B%0Aimport%20org.slf4j.Logger%3B%0Aimport%20org.slf4j.LoggerFactory%3B%0Aimport%20org.springframework.beans.factory.annotation.Value%3B%0Aimport%20org.springframework.context.annotation.Bean%3B%0Aimport%20org.springframework.context.annotation.Configuration%3B%0A%0A%40Configuration%0Apublic%20class%20MQConfig%20%7B%0A%09public%20static%20final%20Logger%20LOGGER%20%3D%20LoggerFactory.getLogger(MQConfig.class)%3B%0A%0A%09%40Value(%22%24%7Brocketmq.producer.groupName%7D%22)%0A%09private%20String%20groupName%3B%0A%0A%09%40Value(%22%24%7Brocketmq.producer.namesrvAddr%7D%22)%0A%09private%20String%20namesrvAddr%3B%0A%0A%09%40Bean%0A%09public%20DefaultMQProducer%20getRocketMQProducer()%20%7B%0A%0A%09%09DefaultMQProducer%20producer%3B%0A%09%09producer%20%3D%20new%20DefaultMQProducer(this.groupName)%3B%0A%0A%09%09producer.setNamesrvAddr(this.namesrvAddr)%3B%0A%0A%09%09try%20%7B%0A%09%09%09producer.start()%3B%0A%09%09%09System.out.println(%22start….%22)%3B%0A%0A%09%09%09LOGGER.info(String.format(%22producer%20is%20start%20!%20groupName%3A%5B%25s%5D%2CnamesrvAddr%3A%5B%25s%5D%22%2C%20this.groupName%2C%0A%09%09%09%09%09this.namesrvAddr))%3B%0A%09%09%7D%20catch%20(MQClientException%20e)%20%7B%0A%09%09%09LOGGER.error(String.format(%22producer%20is%20error%20%7B%7D%22%2C%20e.getMessage()%2C%20e))%3B%0A%09%09%7D%0A%09%09return%20producer%3B%0A%0A%09%7D%0A%7D%0A%0A%60%60%60%0A%0A%23%23%23%20Service%E6%B6%88%E6%81%AF%E5%8F%91%E9%80%81%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.service%3B%0A%0Aimport%20org.apache.rocketmq.client.producer.DefaultMQProducer%3B%0Aimport%20org.apache.rocketmq.common.message.Message%3B%0Aimport%20org.springframework.beans.factory.annotation.Autowired%3B%0Aimport%20org.springframework.stereotype.Service%3B%0A%40Service%0Apublic%20class%20MQService%20%7B%0A%09%40Autowired%0A%09DefaultMQProducer%20producer%3B%0A%09%0A%09%0A%09public%20Object%20sendMsg(String%20string)%20%7B%0A%09%09%0A%09%09for%20(int%20i%20%3D%200%3B%20i%20%3C%201%3B%20i%2B%2B)%20%7B%0A%09%09%09Message%20message%20%3D%20new%20Message(%22tpk02%22%2C%20%22xx%22.getBytes())%3B%0A%09%09%09%0A%09%09%09try%20%7B%0A%09%09%09%09return%20producer.send(message)%3B%0A%09%09%09%7D%20catch%20(Exception%20e)%20%7B%0A%09%09%09%09%2F%2F%20TODO%20Auto-generated%20catch%20block%0A%09%09%09%09e.printStackTrace()%3B%0A%09%09%09%7D%20%0A%09%09%7D%0A%09%09return%20null%3B%0A%09%7D%0A%7D%0A%0A%60%60%60%0A%0A%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%0A%0A%60%60%60properties%0Aspring.application.name%3Dmq01%0Arocketmq.producer.namesrvAddr%3D192.168.150.131%3A9876%0Arocketmq.producer.groupName%3D%24%7Bspring.application.name%7D%0Aserver.port%3D8081%0A%60%60%60%0A%0A%0A%0A%23%23%20Consumer%E9%85%8D%E7%BD%AE%0A%0A%23%23%23%20Config%E9%85%8D%E7%BD%AE%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0A%0Aimport%20org.apache.rocketmq.client.consumer.DefaultMQPushConsumer%3B%0Aimport%20org.slf4j.Logger%3B%0Aimport%20org.slf4j.LoggerFactory%3B%0Aimport%20org.springframework.beans.factory.annotation.Value%3B%0Aimport%20org.springframework.context.annotation.Bean%3B%0Aimport%20org.springframework.context.annotation.Configuration%3B%0A%0A%40Configuration%0Apublic%20class%20MQConfig%20%7B%0A%09%0A%20%20%20%20%0A%09public%20static%20final%20Logger%20logger%20%3D%20LoggerFactory.getLogger(MQConfig.class)%3B%0A%09%0A%09%40Value(%22%24%7Brocketmq.consumer.namesrvAddr%7D%22)%0A%20%20%20%20private%20String%20namesrvAddr%3B%0A%20%20%20%20%40Value(%22%24%7Brocketmq.consumer.groupName%7D%22)%0A%20%20%20%20private%20String%20groupName%3B%0A%20%20%20%20%40Value(%22%24%7Brocketmq.consumer.topics%7D%22)%0A%20%20%20%20private%20String%20topics%3B%0A%09%0A%20%20%20%20%40Bean%0A%20%20%20%20public%20DefaultMQPushConsumer%20getRocketMQConsumer()%20throws%20Exception%20%7B%0A%20%20%20%20%09%0A%20%20%20%20%20%20%20%20DefaultMQPushConsumer%20consumer%20%3D%20new%20DefaultMQPushConsumer(groupName)%3B%0A%20%20%20%20%20%20%20%20consumer.setNamesrvAddr(namesrvAddr)%3B%0A%20%20%20%20%20%20%20%20consumer.subscribe(topics%2C%20%22*%22)%3B%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20consumer.registerMessageListener(new%20MyMessageListener()%20)%3B%0A%20%20%20%20%20%20%20%20consumer.start()%3B%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20return%20consumer%3B%0A%20%20%20%20%7D%0A%7D%0A%0A%60%60%60%0A%0A%23%23%23%20%E6%B6%88%E6%81%AF%E5%A4%84%E7%90%86%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0Aimport%20java.util.List%3B%0A%0Aimport%20org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext%3B%0Aimport%20org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus%3B%0Aimport%20org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently%3B%0Aimport%20org.apache.rocketmq.common.message.MessageExt%3B%0A%0Apublic%20class%20MyMessageListener%20implements%20MessageListenerConcurrently%20%7B%0A%09%09%40Override%0A%09%09public%20ConsumeConcurrentlyStatus%20consumeMessage(List%3CMessageExt%3E%20msgs%2C%20ConsumeConcurrentlyContext%20context)%20%7B%0A%09%09%09System.out.println(%22%E6%9D%A5%E5%95%A6%EF%BC%81%EF%BC%8122%EF%BC%81%22)%3B%0A%09%09%09for%20(MessageExt%20msg%20%3A%20msgs)%20%7B%0A%09%09%09%09%0A%09%09%09%09System.out.println(new%20String(msg.getBody()))%3B%3B%0A%09%09%09%7D%0A%09%09%09return%20ConsumeConcurrentlyStatus.CONSUME_SUCCESS%3B%0A%09%09%7D%0A%7D%0A%60%60%60%0A%0A%23%23%23%20%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%0A%0A%60%60%60%0Aspring.application.name%3Dmq02%0Arocketmq.producer.namesrvAddr%3D192.168.150.131%3A9876%0Arocketmq.producer.groupName%3D%24%7Bspring.application.name%7D%0A%0Arocketmq.consumer.topics%3Dtpk02%0A%0A%60%60%60