listener示例
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Service;
/**
* 示例消息
*
* @author 微风
* @date 2020-04-10
*/
@Service
@Slf4j
@RocketMQMessageListener(topic = "topic-demo",
selectorExpression = "tag-demo",
consumerGroup = "GID-demo_demo-demo",
consumeThreadMax = 10,
consumeTimeout = 10000L)
public class DemoTopicTag2MessageListener extends AbstractMessageListener<MessageExt> {
@Override
public void consumerMessage(MessageExt messageExt) throws ServerException {
if (messageExt == null) {
return;
}
if (messageExt.getTags() == null) {
throw new ServerException(CommonResponseCode.SYS_E_SERVICE_EXCEPTION, "参数缺失");
}
}
}
import com.alibaba.dubbo.config.annotation.Reference;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.StandardEnvironment;
import javax.annotation.PostConstruct;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @param <T> 消息体类型
* @description: 基础消息处理类
* @author: 微风
* @create: 2020/04/11 10:32
*/
@Slf4j
public abstract class AbstractMessageListener<T> implements RocketMQListener<MessageExt> {
@Autowired
private StandardEnvironment environment;
private String charset = "UTF-8";
private String destination;
private AtomicInteger count = new AtomicInteger(0);
private static final int MAX_CONCURRENT_CONSUME_COUNT = 100;
private static final int CONSUME_TIMES = 10;
private final LoadingCache<String, Integer> localCache = CacheBuilder.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.build(new CacheLoader<String, Integer>() {
@Override
public Integer load(String key) {
try {
Object object = redisService.get(key);
if (object == null) {
return MAX_CONCURRENT_CONSUME_COUNT;
}
return Integer.valueOf(object.toString());
} catch (Exception e) {
return MAX_CONCURRENT_CONSUME_COUNT;
}
}
});
@Override
public void onMessage(MessageExt msg) {
if (msg == null) {
return;
}
if (!destination.equals(msg.getTopic() + msg.getTags())) {
log.warn("消息跟listener不匹配");
return;
}
long startTime = System.currentTimeMillis();
String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();
try {
count.incrementAndGet();
log.info("接收消息, messageKey:{} body:{} ", messageKey, new String(msg.getBody(), charset));
if (isDuplicate(msg)) {
log.warn("重复消息 messageKey = {}, id={}", messageKey, msg.getMsgId());
return;
}
// 消费次数以及限流检查
boolean checkMessage = checkMessage(msg, count);
if (!checkMessage) {
throw new ServerException("预检查失败 messageKey:" + messageKey);
}
consumerMessage(doConvertMessage(msg));
long cost = System.currentTimeMillis() - startTime;
if (cost > 1000) {
log.warn("message consume {} cost:{}", messageKey, startTime);
}
if (cost > 3000) {
sendDingDing(messageKey + ",消费超过3秒");
}
} catch (Exception e) {
log.error("消费失败 {}", messageKey, e);
throw new RuntimeException(e.getMessage(), e);
} finally {
count.decrementAndGet();
}
}
/**
* 消费消息
*
* @param msg 消息对象
* @throws ServerException 异常
*/
public abstract void consumerMessage(T msg) throws ServerException;
/**
* 消息转换
*
* @param messageExt 原消息体
* @return T 需要接收的消息对象
*/
private T doConvertMessage(MessageExt messageExt) {
Type messageType = getMessageType();
if (Objects.equals(messageType, MessageExt.class)) {
return (T) messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return (T) str;
} else {
try {
return JsonUtils.parseJson(str, messageType);
} catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, messageType, e);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}
/**
* 获取需要接收的消息对象类型
*
* @return Type 消息体类型
*/
private Type getMessageType() {
Type targetClass = this.getClass().getGenericSuperclass();
if (Objects.isNull(targetClass)) {
return Object.class;
}
Type[] actualTypeArguments = ((ParameterizedType) targetClass).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
/**
* bean初始化调用
*/
@PostConstruct
public void init() {
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
String topic = this.environment.resolvePlaceholders(annotation.topic());
String tag = this.environment.resolvePlaceholders(annotation.selectorExpression());
destination = topic + tag;
}
/**
* 限流检查
* <p>
* 其他listener有自己的消息检查需要可以考虑重写此方法
* </p>
*
* @param msg 消息对象
* @param count 限流数
* @return boolean 是否可以消费
*/
public boolean checkMessage(MessageExt msg, AtomicInteger count) {
if (msg == null) {
return true;
}
String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();
try {
// 消息重试次数超过10次报警
checkConsumeTimes(msg);
// 消费限流
int limitCount = localCache.get(RedisKeyEnum.MQ_FLOW_LIMIT_COUNT.getValue() + "_" + messageKey);
if (count.get() > limitCount) {
log.info("messageKey:{} msg count is more than limit count, count:{}", messageKey, count.get());
return false;
}
log.info("msg messageKey:{}, count:{}", messageKey, count.get());
} catch (Exception e) {
log.error("check error destination: {}", messageKey, e);
return false;
}
return true;
}
/**
* 消息消费幂等通用判断逻辑
* <p>
* 有自己的幂等判断需求的重写此实现
* </p>
*
* @param msg 消息体
* @return 是重复消息返回true,否则返回false
*/
public boolean isDuplicate(MessageExt msg) {
return false;
}
/**
* 消息重试次数超过10次报警
*
* @param msg 消息对象
*/
private void checkConsumeTimes(MessageExt msg) {
if (msg.getReconsumeTimes() >= CONSUME_TIMES) {
try {
String message = msg.getTopic() + ":" + msg.getTags() + "_" + msg.getKeys();
sendDingDing("消息" + message + "已重试" + msg.getReconsumeTimes() + "次");
} catch (Exception e) {
log.error("error to sendDingDingMsg, e:{}", e.getMessage(), e);
}
}
}
/**
* 消息重试次数超过10次报警
*
* @param tip 提示内容
*/
public void sendDingDing(String tip) {
//发送提示信息
}
}
1、消息堆积问题
同一个进程中 ons和rocketmq原生客户端同时订阅一个topic消息的话,会出现阻塞问题,原因是因为这里subversion是用户的时间戳,这样同时存在两个类执行同样的代码,会导致服务端判断出现异常
服务端判断逻辑
2、上报轨迹信息问题
使用开源客户端后ons日志出现这种错误信息,跟踪错误堆栈发现,是在上报消息轨迹信息时,发送消息的topic不存在引起
RMQ_SYS_TRACE_TOPIC
发送轨迹消息时会有一个accessChannel的判断,只有是CLOUD方式才能正确获取topic