深入理解 JMS,实现 Datahub 版 MQ

背景

项目迁移上阿里专有云的EDAS,过程中因 云上MQ不可用,需要修改为 Datahub。

为了项目本身的代码不用大改,找了解决方案。发现项目中使用的 ActiveMQ 是实现 JMS 规范的,而项目中的调用也是通过 JMS 的相关方法调用,没有调用 ActiveMQ 原生方法,所以选择用实现了 JMS 的Datahub 代替 ActiveMQ 。

在 Github 上,发现没找到(==可能也是我不会搜),所以造个轮子。

介绍

JMS , Java Message Service

Java™ Message Service (JMS) is an API that allows application components based on Java EE to create, send, receive, and read messages. JMS support in Liberty is supplied as a group of related features that support the deployment of JMS resource adapters. JMS can run in a managed mode in which queues, topics, connections, and other resources are created and managed through server configuration. This includes the configuration of JMS connection factories, queues, topics, and activation specifications. Alternatively it can run in unmanaged mode where all resources are manually configured as part of the application. The Liberty embedded JMS messaging provider is managed, and therefore all resources are set up as part of the server.xml configuration.

出处 :https://www.ibm.com/support/knowledgecenter/en/SSGMCP_5.4.0/applications/developing/java/dfhpj2_jms.html

另一篇参考:https://docs.oracle.com/cd/E26576_01/doc.312/e24949/messaging-systems-introduction.htm#GMTOV00115

JMS只有两种方式

一种是点对点的队列,Queue

另一种是发布/订阅的主题,Topic

常见的就是 JmsTemplate ,这就是 jms 的接口,各种 MQ 针对 JMS 提供了Spring Boot的实现,开发者可以直接使用。

原理

jms 的规范都在 javax.jms:javax.jms-api.jar 里面

创建步骤

  1. ConnectionFactory 创建 Connection,Connection 类似于 TCP 长连接,达到复用的效果
  2. Connection 上创建 Session
  3. Session 创建 MessageProducer 和 MessageConsumer,前者管理发送消息的,后者管理接收消息
  4. MessageConsumer 在程序启动时会检查并自动加载,在执行 JmsTemplate.send 会触发并创建
  5. MessageProducer 在执行 JmsTemplate.send 会被触发并创建,并将消息封装成 Message ,传递给 MessageConsumer 处理

Datahub 版 MQ - 图1 而以上相关类,同时还会细分为 Topic 和 Queue 两种接口。

以 ActiveMQ 的 Session 为例,

Datahub 版 MQ - 图2

Session
Datahub 版 MQ - 图3 Datahub 版 MQ - 图4
Datahub 版 MQ - 图5

Session 实现了 Runnable 接口,所以每次创建时,都是新建线程处理的。

源码分析

Spring Boot 的 jar 都有一个特点,就是约定大于配置。

jms 的自动加载类为 org.springframework.jms.annotation.JmsBootstrapConfiguration

分析发现,被@JmsListener标记的方法,将被自动加载

org.springframework.jms.annotation.JmsListenerAnnotationBeanPostProcessor自动加载为 MethodJmsListenerEndpoint 对象。

而在自定义监听,则是在代码中创建 SimpleJmsListenerEndpoint,通过 JmsListenerEndpointRegistry 会将 SimpleJmsListenerEndpoint 注册上,注册方法为registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, boolean startImmediately)

这一步需要认证阅读 ActiveMQSession

实现

Datahub 只支持 Topic 形式的发布订阅,而对于点对点的队列,可以看作时一种特殊的 Topic 实现,区别在于后者永远有且仅有一个订阅者。

要实现 mq 形式的使用,首先要自定义JmsTemplate 。按照对象的创建和代码的调用 ,可以发现需要逐步实现ConnectionFactory

最后根据自定义的JmsListenerContainerFactory 创建 JmsTemplate即可

  1. @Bean
  2. public JmsTemplate jmsQueueTemplate(ConnectionFactory connectionFactory){
  3. JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
  4. jmsTemplate.setPubSubDomain(false);
  5. return jmsTemplate;
  6. }

理解rocket-jms

在spring中,发送消息使用JmsTemplate,接受消息使用 @JmsListener 或 SimpleJmsListenerEndpoint

项目:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms

@JmsListener是怎么生效的

一般使用,都是在方法上添加注解,spring会自动装在

  1. @JmsListener(destination, containerFactory)

destination是topic,containerFactory则负责和mq的连接,由于spring和jms实现了方法到containerFactory的实现,我们需要实现ConnectionFactory以及相关类

不信?看源码验证下。

从启动类 JmsBootstrapConfiguration 看

  1. @Configuration
  2. public class JmsBootstrapConfiguration {
  3. @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  4. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  5. public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
  6. return new JmsListenerAnnotationBeanPostProcessor();
  7. }
  8. @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  9. public JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
  10. return new JmsListenerEndpointRegistry();
  11. }
  12. }

具体的入口方法为BeanPostProcessor的接口

  1. @Override
  2. public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
  3. if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
  4. final Set<Method> annotatedMethods = new LinkedHashSet<Method>(1);
  5. Class<?> targetClass = AopUtils.getTargetClass(bean);
  6. ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
  7. @Override
  8. public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
  9. JmsListener jmsListener = AnnotationUtils.getAnnotation(method, JmsListener.class);
  10. if (jmsListener != null) {
  11. processJmsListener(jmsListener, method, bean);
  12. annotatedMethods.add(method);
  13. }
  14. }
  15. });
  16. if (annotatedMethods.isEmpty()) {
  17. this.nonAnnotatedClasses.add(bean.getClass());
  18. if (logger.isDebugEnabled()) {
  19. logger.debug("No @JmsListener annotations found on bean class: " + bean.getClass());
  20. }
  21. }
  22. else {
  23. // Non-empty set of methods
  24. if (logger.isDebugEnabled()) {
  25. logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
  26. "': " + annotatedMethods);
  27. }
  28. }
  29. }
  30. return bean;
  31. }

JmsListenerAnnotationBeanPostProcessor.processJmsListener负责具体的创建,将消费方法转化为MethodJmsListenerEndpoint,和具体的mq实现ConnectionFactory关联起来

  1. protected void processJmsListener(JmsListener jmsListener, Method method, Object bean) {
  2. if (AopUtils.isJdkDynamicProxy(bean)) {
  3. try {
  4. // Found a @JmsListener method on the target class for this JDK proxy ->
  5. // is it also present on the proxy itself?
  6. method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
  7. }
  8. catch (SecurityException ex) {
  9. ReflectionUtils.handleReflectionException(ex);
  10. }
  11. catch (NoSuchMethodException ex) {
  12. throw new IllegalStateException(String.format(
  13. "@JmsListener method '%s' found on bean target class '%s', " +
  14. "but not found in any interface(s) for bean JDK proxy. Either " +
  15. "pull the method up to an interface or switch to subclass (CGLIB) " +
  16. "proxies by setting proxy-target-class/proxyTargetClass " +
  17. "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()));
  18. }
  19. }
  20. MethodJmsListenerEndpoint endpoint = new MethodJmsListenerEndpoint();
  21. endpoint.setBean(bean);
  22. endpoint.setMethod(method);
  23. endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  24. endpoint.setId(getEndpointId(jmsListener));
  25. endpoint.setDestination(resolve(jmsListener.destination()));
  26. if (StringUtils.hasText(jmsListener.selector())) {
  27. endpoint.setSelector(resolve(jmsListener.selector()));
  28. }
  29. if (StringUtils.hasText(jmsListener.subscription())) {
  30. endpoint.setSubscription(resolve(jmsListener.subscription()));
  31. }
  32. if (StringUtils.hasText(jmsListener.concurrency())) {
  33. endpoint.setConcurrency(resolve(jmsListener.concurrency()));
  34. }
  35. JmsListenerContainerFactory<?> factory = null;
  36. String containerFactoryBeanName = resolve(jmsListener.containerFactory());
  37. if (StringUtils.hasText(containerFactoryBeanName)) {
  38. Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  39. try {
  40. factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
  41. }
  42. catch (NoSuchBeanDefinitionException ex) {
  43. throw new BeanInitializationException("Could not register jms listener endpoint on [" +
  44. method + "], no " + JmsListenerContainerFactory.class.getSimpleName() + " with id '" +
  45. containerFactoryBeanName + "' was found in the application context", ex);
  46. }
  47. }
  48. this.registrar.registerEndpoint(endpoint, factory);
  49. }

rocketmq的实现

一对一:ConnectionFactory—>Connection—>Session

Datahub 版 MQ - 图6 值得关注的点:

使用单例,比如JmsListenerContainerFactory 使用单个对象作为锁

  1. private final Object connectionMonitor = new Object();
  2. protected void initConnection() throws JMSException {
  3. synchronized (this.connectionMonitor) {
  4. if (this.connection != null) {
  5. closeConnection(this.connection);
  6. }
  7. this.connection = doCreateConnection();
  8. logger.debug("Established shared JMS Connection: {}", this.connection);
  9. }
  10. }

使用CopyOnWriteArrayList管理不同destination的消费者

  1. public class JmsBaseSession implements Session {
  2. protected CommonContext context;
  3. protected JmsBaseConnection connection;
  4. protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
  5. new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
  6. }

单个消费者RMQPushConsumerExt 支持监听同一个topic的多个方法,MessageListener 实现 MessageListenerConcurrently 并发消费

  1. public class RMQPushConsumerExt {
  2. private final MQPushConsumer consumer;
  3. private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>();
  4. }

PS:rocketmq的事务在jms不支持