listener示例

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.rocketmq.common.message.MessageExt;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * 示例消息
  7. *
  8. * @author 微风
  9. * @date 2020-04-10
  10. */
  11. @Service
  12. @Slf4j
  13. @RocketMQMessageListener(topic = "topic-demo",
  14. selectorExpression = "tag-demo",
  15. consumerGroup = "GID-demo_demo-demo",
  16. consumeThreadMax = 10,
  17. consumeTimeout = 10000L)
  18. public class DemoTopicTag2MessageListener extends AbstractMessageListener<MessageExt> {
  19. @Override
  20. public void consumerMessage(MessageExt messageExt) throws ServerException {
  21. if (messageExt == null) {
  22. return;
  23. }
  24. if (messageExt.getTags() == null) {
  25. throw new ServerException(CommonResponseCode.SYS_E_SERVICE_EXCEPTION, "参数缺失");
  26. }
  27. }
  28. }
  1. import com.alibaba.dubbo.config.annotation.Reference;
  2. import com.google.common.cache.CacheBuilder;
  3. import com.google.common.cache.CacheLoader;
  4. import com.google.common.cache.LoadingCache;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  8. import org.apache.rocketmq.spring.core.RocketMQListener;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.core.env.StandardEnvironment;
  11. import javax.annotation.PostConstruct;
  12. import java.lang.reflect.ParameterizedType;
  13. import java.lang.reflect.Type;
  14. import java.nio.charset.Charset;
  15. import java.util.Objects;
  16. import java.util.concurrent.TimeUnit;
  17. import java.util.concurrent.atomic.AtomicInteger;
  18. /**
  19. * @param <T> 消息体类型
  20. * @description: 基础消息处理类
  21. * @author: 微风
  22. * @create: 2020/04/11 10:32
  23. */
  24. @Slf4j
  25. public abstract class AbstractMessageListener<T> implements RocketMQListener<MessageExt> {
  26. @Autowired
  27. private StandardEnvironment environment;
  28. private String charset = "UTF-8";
  29. private String destination;
  30. private AtomicInteger count = new AtomicInteger(0);
  31. private static final int MAX_CONCURRENT_CONSUME_COUNT = 100;
  32. private static final int CONSUME_TIMES = 10;
  33. private final LoadingCache<String, Integer> localCache = CacheBuilder.newBuilder()
  34. .expireAfterWrite(30, TimeUnit.SECONDS)
  35. .build(new CacheLoader<String, Integer>() {
  36. @Override
  37. public Integer load(String key) {
  38. try {
  39. Object object = redisService.get(key);
  40. if (object == null) {
  41. return MAX_CONCURRENT_CONSUME_COUNT;
  42. }
  43. return Integer.valueOf(object.toString());
  44. } catch (Exception e) {
  45. return MAX_CONCURRENT_CONSUME_COUNT;
  46. }
  47. }
  48. });
  49. @Override
  50. public void onMessage(MessageExt msg) {
  51. if (msg == null) {
  52. return;
  53. }
  54. if (!destination.equals(msg.getTopic() + msg.getTags())) {
  55. log.warn("消息跟listener不匹配");
  56. return;
  57. }
  58. long startTime = System.currentTimeMillis();
  59. String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();
  60. try {
  61. count.incrementAndGet();
  62. log.info("接收消息, messageKey:{} body:{} ", messageKey, new String(msg.getBody(), charset));
  63. if (isDuplicate(msg)) {
  64. log.warn("重复消息 messageKey = {}, id={}", messageKey, msg.getMsgId());
  65. return;
  66. }
  67. // 消费次数以及限流检查
  68. boolean checkMessage = checkMessage(msg, count);
  69. if (!checkMessage) {
  70. throw new ServerException("预检查失败 messageKey:" + messageKey);
  71. }
  72. consumerMessage(doConvertMessage(msg));
  73. long cost = System.currentTimeMillis() - startTime;
  74. if (cost > 1000) {
  75. log.warn("message consume {} cost:{}", messageKey, startTime);
  76. }
  77. if (cost > 3000) {
  78. sendDingDing(messageKey + ",消费超过3秒");
  79. }
  80. } catch (Exception e) {
  81. log.error("消费失败 {}", messageKey, e);
  82. throw new RuntimeException(e.getMessage(), e);
  83. } finally {
  84. count.decrementAndGet();
  85. }
  86. }
  87. /**
  88. * 消费消息
  89. *
  90. * @param msg 消息对象
  91. * @throws ServerException 异常
  92. */
  93. public abstract void consumerMessage(T msg) throws ServerException;
  94. /**
  95. * 消息转换
  96. *
  97. * @param messageExt 原消息体
  98. * @return T 需要接收的消息对象
  99. */
  100. private T doConvertMessage(MessageExt messageExt) {
  101. Type messageType = getMessageType();
  102. if (Objects.equals(messageType, MessageExt.class)) {
  103. return (T) messageExt;
  104. } else {
  105. String str = new String(messageExt.getBody(), Charset.forName(charset));
  106. if (Objects.equals(messageType, String.class)) {
  107. return (T) str;
  108. } else {
  109. try {
  110. return JsonUtils.parseJson(str, messageType);
  111. } catch (Exception e) {
  112. log.error("convert failed. str:{}, msgType:{}", str, messageType, e);
  113. throw new RuntimeException("cannot convert message to " + messageType, e);
  114. }
  115. }
  116. }
  117. }
  118. /**
  119. * 获取需要接收的消息对象类型
  120. *
  121. * @return Type 消息体类型
  122. */
  123. private Type getMessageType() {
  124. Type targetClass = this.getClass().getGenericSuperclass();
  125. if (Objects.isNull(targetClass)) {
  126. return Object.class;
  127. }
  128. Type[] actualTypeArguments = ((ParameterizedType) targetClass).getActualTypeArguments();
  129. if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
  130. return actualTypeArguments[0];
  131. }
  132. return Object.class;
  133. }
  134. /**
  135. * bean初始化调用
  136. */
  137. @PostConstruct
  138. public void init() {
  139. RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
  140. String topic = this.environment.resolvePlaceholders(annotation.topic());
  141. String tag = this.environment.resolvePlaceholders(annotation.selectorExpression());
  142. destination = topic + tag;
  143. }
  144. /**
  145. * 限流检查
  146. * <p>
  147. * 其他listener有自己的消息检查需要可以考虑重写此方法
  148. * </p>
  149. *
  150. * @param msg 消息对象
  151. * @param count 限流数
  152. * @return boolean 是否可以消费
  153. */
  154. public boolean checkMessage(MessageExt msg, AtomicInteger count) {
  155. if (msg == null) {
  156. return true;
  157. }
  158. String messageKey = msg.getTopic() + ":" + msg.getTags() + ":" + msg.getKeys();
  159. try {
  160. // 消息重试次数超过10次报警
  161. checkConsumeTimes(msg);
  162. // 消费限流
  163. int limitCount = localCache.get(RedisKeyEnum.MQ_FLOW_LIMIT_COUNT.getValue() + "_" + messageKey);
  164. if (count.get() > limitCount) {
  165. log.info("messageKey:{} msg count is more than limit count, count:{}", messageKey, count.get());
  166. return false;
  167. }
  168. log.info("msg messageKey:{}, count:{}", messageKey, count.get());
  169. } catch (Exception e) {
  170. log.error("check error destination: {}", messageKey, e);
  171. return false;
  172. }
  173. return true;
  174. }
  175. /**
  176. * 消息消费幂等通用判断逻辑
  177. * <p>
  178. * 有自己的幂等判断需求的重写此实现
  179. * </p>
  180. *
  181. * @param msg 消息体
  182. * @return 是重复消息返回true,否则返回false
  183. */
  184. public boolean isDuplicate(MessageExt msg) {
  185. return false;
  186. }
  187. /**
  188. * 消息重试次数超过10次报警
  189. *
  190. * @param msg 消息对象
  191. */
  192. private void checkConsumeTimes(MessageExt msg) {
  193. if (msg.getReconsumeTimes() >= CONSUME_TIMES) {
  194. try {
  195. String message = msg.getTopic() + ":" + msg.getTags() + "_" + msg.getKeys();
  196. sendDingDing("消息" + message + "已重试" + msg.getReconsumeTimes() + "次");
  197. } catch (Exception e) {
  198. log.error("error to sendDingDingMsg, e:{}", e.getMessage(), e);
  199. }
  200. }
  201. }
  202. /**
  203. * 消息重试次数超过10次报警
  204. *
  205. * @param tip 提示内容
  206. */
  207. public void sendDingDing(String tip) {
  208. //发送提示信息
  209. }
  210. }

1、消息堆积问题

同一个进程中 ons和rocketmq原生客户端同时订阅一个topic消息的话,会出现阻塞问题,原因是因为这里subversion是用户的时间戳,这样同时存在两个类执行同样的代码,会导致服务端判断出现异常
image.png
服务端判断逻辑
image.png

2、上报轨迹信息问题

使用开源客户端后ons日志出现这种错误信息,跟踪错误堆栈发现,是在上报消息轨迹信息时,发送消息的topic不存在引起
RMQ_SYS_TRACE_TOPIC
image.png

发送轨迹消息时会有一个accessChannel的判断,只有是CLOUD方式才能正确获取topic
image.png