RocketMQ 09 SpringBoot 整合
RocketMQ 09 SpringBoot 整合
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.1</version>
</dependency>
Producer配置
Config配置类
用于在系统启动时初始化producer参数并启动
package com.mashibing.rmq.controller;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConfig.class);
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
try {
producer.start();
System.out.println("start....");
LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]", this.groupName,
this.namesrvAddr));
} catch (MQClientException e) {
LOGGER.error(String.format("producer is error {}", e.getMessage(), e));
}
return producer;
}
}
Service消息发送类
package com.mashibing.rmq.service;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQService {
@Autowired
DefaultMQProducer producer;
public Object sendMsg(String string) {
for (int i = 0; i < 1; i++) {
Message message = new Message("tpk02", "xx".getBytes());
try {
return producer.send(message);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return null;
}
}
配置文件
spring.application.name=mq01
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}
server.port=8081
Consumer配置
Config配置类
package com.mashibing.rmq.controller;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
public static final Logger logger = LoggerFactory.getLogger(MQConfig.class);
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topics, "*");
consumer.registerMessageListener(new MyMessageListener() );
consumer.start();
return consumer;
}
}
消息处理类
package com.mashibing.rmq.controller;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class MyMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("来啦!!22!");
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
配置文件
spring.application.name=mq02
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}
rocketmq.consumer.topics=tpk02
%23%20RocketMQ%2009%20SpringBoot%20%E6%95%B4%E5%90%88%0A%0A%E7%9B%AE%E5%89%8D%E8%BF%98%E6%B2%A1%E6%9C%89%E5%AE%98%E6%96%B9%E7%9A%84starter%0A%0A%23%23%23%20pom.xml%0A%0A%60%60%60%0A%09%09%3Cdependency%3E%0A%09%09%20%20%20%20%3CgroupId%3Eorg.apache.rocketmq%3C%2FgroupId%3E%0A%09%09%20%20%20%20%3CartifactId%3Erocketmq-common%3C%2FartifactId%3E%0A%09%09%20%20%20%20%3Cversion%3E4.6.1%3C%2Fversion%3E%0A%09%09%3C%2Fdependency%3E%0A%09%09%3C!—%20https%3A%2F%2Fmvnrepository.com%2Fartifact%2Forg.apache.rocketmq%2Frocketmq-client%20—%3E%0A%09%09%3Cdependency%3E%0A%09%09%20%20%20%20%3CgroupId%3Eorg.apache.rocketmq%3C%2FgroupId%3E%0A%09%09%20%20%20%20%3CartifactId%3Erocketmq-client%3C%2FartifactId%3E%0A%09%09%20%20%20%20%3Cversion%3E4.6.1%3C%2Fversion%3E%0A%09%09%3C%2Fdependency%3E%0A%60%60%60%0A%0A%0A%0A%23%23%20Producer%E9%85%8D%E7%BD%AE%0A%0A%23%23%23%20Config%E9%85%8D%E7%BD%AE%E7%B1%BB%0A%0A%E7%94%A8%E4%BA%8E%E5%9C%A8%E7%B3%BB%E7%BB%9F%E5%90%AF%E5%8A%A8%E6%97%B6%E5%88%9D%E5%A7%8B%E5%8C%96producer%E5%8F%82%E6%95%B0%E5%B9%B6%E5%90%AF%E5%8A%A8%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0Aimport%20org.apache.rocketmq.client.exception.MQClientException%3B%0Aimport%20org.apache.rocketmq.client.producer.DefaultMQProducer%3B%0Aimport%20org.slf4j.Logger%3B%0Aimport%20org.slf4j.LoggerFactory%3B%0Aimport%20org.springframework.beans.factory.annotation.Value%3B%0Aimport%20org.springframework.context.annotation.Bean%3B%0Aimport%20org.springframework.context.annotation.Configuration%3B%0A%0A%40Configuration%0Apublic%20class%20MQConfig%20%7B%0A%09public%20static%20final%20Logger%20LOGGER%20%3D%20LoggerFactory.getLogger(MQConfig.class)%3B%0A%0A%09%40Value(%22%24%7Brocketmq.producer.groupName%7D%22)%0A%09private%20String%20groupName%3B%0A%0A%09%40Value(%22%24%7Brocketmq.producer.namesrvAddr%7D%22)%0A%09private%20String%20namesrvAddr%3B%0A%0A%09%40Bean%0A%09public%20DefaultMQProducer%20getRocketMQProducer()%20%7B%0A%0A%09%09DefaultMQProducer%20producer%3B%0A%09%09producer%20%3D%20new%20DefaultMQProducer(this.groupName)%3B%0A%0A%09%09producer.setNamesrvAddr(this.namesrvAddr)%3B%0A%0A%09%09try%20%7B%0A%09%09%09producer.start()%3B%0A%09%09%09System.out.println(%22start….%22)%3B%0A%0A%09%09%09LOGGER.info(String.format(%22producer%20is%20start%20!%20groupName%3A%5B%25s%5D%2CnamesrvAddr%3A%5B%25s%5D%22%2C%20this.groupName%2C%0A%09%09%09%09%09this.namesrvAddr))%3B%0A%09%09%7D%20catch%20(MQClientException%20e)%20%7B%0A%09%09%09LOGGER.error(String.format(%22producer%20is%20error%20%7B%7D%22%2C%20e.getMessage()%2C%20e))%3B%0A%09%09%7D%0A%09%09return%20producer%3B%0A%0A%09%7D%0A%7D%0A%0A%60%60%60%0A%0A%23%23%23%20Service%E6%B6%88%E6%81%AF%E5%8F%91%E9%80%81%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.service%3B%0A%0Aimport%20org.apache.rocketmq.client.producer.DefaultMQProducer%3B%0Aimport%20org.apache.rocketmq.common.message.Message%3B%0Aimport%20org.springframework.beans.factory.annotation.Autowired%3B%0Aimport%20org.springframework.stereotype.Service%3B%0A%40Service%0Apublic%20class%20MQService%20%7B%0A%09%40Autowired%0A%09DefaultMQProducer%20producer%3B%0A%09%0A%09%0A%09public%20Object%20sendMsg(String%20string)%20%7B%0A%09%09%0A%09%09for%20(int%20i%20%3D%200%3B%20i%20%3C%201%3B%20i%2B%2B)%20%7B%0A%09%09%09Message%20message%20%3D%20new%20Message(%22tpk02%22%2C%20%22xx%22.getBytes())%3B%0A%09%09%09%0A%09%09%09try%20%7B%0A%09%09%09%09return%20producer.send(message)%3B%0A%09%09%09%7D%20catch%20(Exception%20e)%20%7B%0A%09%09%09%09%2F%2F%20TODO%20Auto-generated%20catch%20block%0A%09%09%09%09e.printStackTrace()%3B%0A%09%09%09%7D%20%0A%09%09%7D%0A%09%09return%20null%3B%0A%09%7D%0A%7D%0A%0A%60%60%60%0A%0A%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%0A%0A%60%60%60properties%0Aspring.application.name%3Dmq01%0Arocketmq.producer.namesrvAddr%3D192.168.150.131%3A9876%0Arocketmq.producer.groupName%3D%24%7Bspring.application.name%7D%0Aserver.port%3D8081%0A%60%60%60%0A%0A%0A%0A%23%23%20Consumer%E9%85%8D%E7%BD%AE%0A%0A%23%23%23%20Config%E9%85%8D%E7%BD%AE%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0A%0Aimport%20org.apache.rocketmq.client.consumer.DefaultMQPushConsumer%3B%0Aimport%20org.slf4j.Logger%3B%0Aimport%20org.slf4j.LoggerFactory%3B%0Aimport%20org.springframework.beans.factory.annotation.Value%3B%0Aimport%20org.springframework.context.annotation.Bean%3B%0Aimport%20org.springframework.context.annotation.Configuration%3B%0A%0A%40Configuration%0Apublic%20class%20MQConfig%20%7B%0A%09%0A%20%20%20%20%0A%09public%20static%20final%20Logger%20logger%20%3D%20LoggerFactory.getLogger(MQConfig.class)%3B%0A%09%0A%09%40Value(%22%24%7Brocketmq.consumer.namesrvAddr%7D%22)%0A%20%20%20%20private%20String%20namesrvAddr%3B%0A%20%20%20%20%40Value(%22%24%7Brocketmq.consumer.groupName%7D%22)%0A%20%20%20%20private%20String%20groupName%3B%0A%20%20%20%20%40Value(%22%24%7Brocketmq.consumer.topics%7D%22)%0A%20%20%20%20private%20String%20topics%3B%0A%09%0A%20%20%20%20%40Bean%0A%20%20%20%20public%20DefaultMQPushConsumer%20getRocketMQConsumer()%20throws%20Exception%20%7B%0A%20%20%20%20%09%0A%20%20%20%20%20%20%20%20DefaultMQPushConsumer%20consumer%20%3D%20new%20DefaultMQPushConsumer(groupName)%3B%0A%20%20%20%20%20%20%20%20consumer.setNamesrvAddr(namesrvAddr)%3B%0A%20%20%20%20%20%20%20%20consumer.subscribe(topics%2C%20%22*%22)%3B%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20consumer.registerMessageListener(new%20MyMessageListener()%20)%3B%0A%20%20%20%20%20%20%20%20consumer.start()%3B%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20return%20consumer%3B%0A%20%20%20%20%7D%0A%7D%0A%0A%60%60%60%0A%0A%23%23%23%20%E6%B6%88%E6%81%AF%E5%A4%84%E7%90%86%E7%B1%BB%0A%0A%60%60%60java%0Apackage%20com.mashibing.rmq.controller%3B%0A%0Aimport%20java.util.List%3B%0A%0Aimport%20org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext%3B%0Aimport%20org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus%3B%0Aimport%20org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently%3B%0Aimport%20org.apache.rocketmq.common.message.MessageExt%3B%0A%0Apublic%20class%20MyMessageListener%20implements%20MessageListenerConcurrently%20%7B%0A%09%09%40Override%0A%09%09public%20ConsumeConcurrentlyStatus%20consumeMessage(List%3CMessageExt%3E%20msgs%2C%20ConsumeConcurrentlyContext%20context)%20%7B%0A%09%09%09System.out.println(%22%E6%9D%A5%E5%95%A6%EF%BC%81%EF%BC%8122%EF%BC%81%22)%3B%0A%09%09%09for%20(MessageExt%20msg%20%3A%20msgs)%20%7B%0A%09%09%09%09%0A%09%09%09%09System.out.println(new%20String(msg.getBody()))%3B%3B%0A%09%09%09%7D%0A%09%09%09return%20ConsumeConcurrentlyStatus.CONSUME_SUCCESS%3B%0A%09%09%7D%0A%7D%0A%60%60%60%0A%0A%23%23%23%20%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%0A%0A%60%60%60%0Aspring.application.name%3Dmq02%0Arocketmq.producer.namesrvAddr%3D192.168.150.131%3A9876%0Arocketmq.producer.groupName%3D%24%7Bspring.application.name%7D%0A%0Arocketmq.consumer.topics%3Dtpk02%0A%0A%60%60%60