listener示例
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.springframework.stereotype.Service;/*** 示例消息** @author 微风* @date 2020-04-10*/@Service@Slf4j@RocketMQMessageListener(topic = "topic-demo",selectorExpression = "tag-demo",consumerGroup = "GID-demo_demo-demo",consumeThreadMax = 10,consumeTimeout = 10000L)public class DemoTopicTag2MessageListener extends AbstractMessageListener<MessageExt> {@Overridepublic void consumerMessage(MessageExt messageExt) throws ServerException {if (messageExt == null) {return;}if (messageExt.getTags() == null) {throw new ServerException(CommonResponseCode.SYS_E_SERVICE_EXCEPTION, "参数缺失");}}}
import com.alibaba.dubbo.config.annotation.Reference;import com.google.common.cache.CacheBuilder;import com.google.common.cache.CacheLoader;import com.google.common.cache.LoadingCache;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.core.env.StandardEnvironment;import javax.annotation.PostConstruct;import java.lang.reflect.ParameterizedType;import java.lang.reflect.Type;import java.nio.charset.Charset;import java.util.Objects;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/*** @param <T> 消息体类型* @description: 基础消息处理类* @author: 微风* @create: 2020/04/11 10:32*/@Slf4jpublic abstract class AbstractMessageListener<T> implements RocketMQListener<MessageExt> {@Autowiredprivate StandardEnvironment environment;private String charset = "UTF-8";private String destination;private AtomicInteger count = new AtomicInteger(0);private static final int MAX_CONCURRENT_CONSUME_COUNT = 100;private static final int CONSUME_TIMES = 10;private final LoadingCache<String, Integer> localCache = CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS).build(new CacheLoader<String, Integer>() {@Overridepublic Integer load(String key) {try {Object object = redisService.get(key);if (object == null) {return MAX_CONCURRENT_CONSUME_COUNT;}return Integer.valueOf(object.toString());} catch (Exception e) {return MAX_CONCURRENT_CONSUME_COUNT;}}});@Overridepublic void onMessage(MessageExt msg) {if (msg == null) {return;}if (!destination.equals(msg.getTopic() + msg.getTags())) {log.warn("消息跟listener不匹配");return;}long startTime = System.currentTimeMillis();String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();try {count.incrementAndGet();log.info("接收消息, messageKey:{} body:{} ", messageKey, new String(msg.getBody(), charset));if (isDuplicate(msg)) {log.warn("重复消息 messageKey = {}, id={}", messageKey, msg.getMsgId());return;}// 消费次数以及限流检查boolean checkMessage = checkMessage(msg, count);if (!checkMessage) {throw new ServerException("预检查失败 messageKey:" + messageKey);}consumerMessage(doConvertMessage(msg));long cost = System.currentTimeMillis() - startTime;if (cost > 1000) {log.warn("message consume {} cost:{}", messageKey, startTime);}if (cost > 3000) {sendDingDing(messageKey + ",消费超过3秒");}} catch (Exception e) {log.error("消费失败 {}", messageKey, e);throw new RuntimeException(e.getMessage(), e);} finally {count.decrementAndGet();}}/*** 消费消息** @param msg 消息对象* @throws ServerException 异常*/public abstract void consumerMessage(T msg) throws ServerException;/*** 消息转换** @param messageExt 原消息体* @return T 需要接收的消息对象*/private T doConvertMessage(MessageExt messageExt) {Type messageType = getMessageType();if (Objects.equals(messageType, MessageExt.class)) {return (T) messageExt;} else {String str = new String(messageExt.getBody(), Charset.forName(charset));if (Objects.equals(messageType, String.class)) {return (T) str;} else {try {return JsonUtils.parseJson(str, messageType);} catch (Exception e) {log.error("convert failed. str:{}, msgType:{}", str, messageType, e);throw new RuntimeException("cannot convert message to " + messageType, e);}}}}/*** 获取需要接收的消息对象类型** @return Type 消息体类型*/private Type getMessageType() {Type targetClass = this.getClass().getGenericSuperclass();if (Objects.isNull(targetClass)) {return Object.class;}Type[] actualTypeArguments = ((ParameterizedType) targetClass).getActualTypeArguments();if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {return actualTypeArguments[0];}return Object.class;}/*** bean初始化调用*/@PostConstructpublic void init() {RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);String topic = this.environment.resolvePlaceholders(annotation.topic());String tag = this.environment.resolvePlaceholders(annotation.selectorExpression());destination = topic + tag;}/*** 限流检查* <p>* 其他listener有自己的消息检查需要可以考虑重写此方法* </p>** @param msg 消息对象* @param count 限流数* @return boolean 是否可以消费*/public boolean checkMessage(MessageExt msg, AtomicInteger count) {if (msg == null) {return true;}String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();try {// 消息重试次数超过10次报警checkConsumeTimes(msg);// 消费限流int limitCount = localCache.get(RedisKeyEnum.MQ_FLOW_LIMIT_COUNT.getValue() + "_" + messageKey);if (count.get() > limitCount) {log.info("messageKey:{} msg count is more than limit count, count:{}", messageKey, count.get());return false;}log.info("msg messageKey:{}, count:{}", messageKey, count.get());} catch (Exception e) {log.error("check error destination: {}", messageKey, e);return false;}return true;}/*** 消息消费幂等通用判断逻辑* <p>* 有自己的幂等判断需求的重写此实现* </p>** @param msg 消息体* @return 是重复消息返回true,否则返回false*/public boolean isDuplicate(MessageExt msg) {return false;}/*** 消息重试次数超过10次报警** @param msg 消息对象*/private void checkConsumeTimes(MessageExt msg) {if (msg.getReconsumeTimes() >= CONSUME_TIMES) {try {String message = msg.getTopic() + ":" + msg.getTags() + "_" + msg.getKeys();sendDingDing("消息" + message + "已重试" + msg.getReconsumeTimes() + "次");} catch (Exception e) {log.error("error to sendDingDingMsg, e:{}", e.getMessage(), e);}}}/*** 消息重试次数超过10次报警** @param tip 提示内容*/public void sendDingDing(String tip) {//发送提示信息}}
1、消息堆积问题
同一个进程中 ons和rocketmq原生客户端同时订阅一个topic消息的话,会出现阻塞问题,原因是因为这里subversion是用户的时间戳,这样同时存在两个类执行同样的代码,会导致服务端判断出现异常
服务端判断逻辑
2、上报轨迹信息问题
使用开源客户端后ons日志出现这种错误信息,跟踪错误堆栈发现,是在上报消息轨迹信息时,发送消息的topic不存在引起
RMQ_SYS_TRACE_TOPIC
发送轨迹消息时会有一个accessChannel的判断,只有是CLOUD方式才能正确获取topic
