Spring JmsTemplate

源码分析

send 发送消息

  1. @Override
  2. public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
  3. // 执行.
  4. execute(session -> {
  5. Destination destination = resolveDestinationName(session, destinationName);
  6. doSend(session, destination, messageCreator);
  7. return null;
  8. }, false);
  9. }
  1. @Nullable
  2. public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
  3. Assert.notNull(action, "Callback object must not be null");
  4. Connection conToClose = null;
  5. Session sessionToClose = null;
  6. try {
  7. Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
  8. obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
  9. if (sessionToUse == null) {
  10. // 创建链接
  11. conToClose = createConnection();
  12. // 创建session
  13. sessionToClose = createSession(conToClose);
  14. if (startConnection) {
  15. conToClose.start();
  16. }
  17. sessionToUse = sessionToClose;
  18. }
  19. if (logger.isDebugEnabled()) {
  20. logger.debug("Executing callback on JMS Session: " + sessionToUse);
  21. }
  22. /**
  23. * sessionCallback 执行
  24. * {@link JmsTemplate#doSend(Session, javax.jms.Destination, org.springframework.jms.core.MessageCreator)}
  25. */
  26. return action.doInJms(sessionToUse);
  27. } catch (JMSException ex) {
  28. throw convertJmsAccessException(ex);
  29. } finally {
  30. // 资源释放
  31. JmsUtils.closeSession(sessionToClose);
  32. ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
  33. }
  34. }
  • 最后action.doInJms(sessionToUse)的操作
  1. Destination destination = resolveDestinationName(session, destinationName);
  2. doSend(session, destination, messageCreator);
  3. return null;
  • doSend真正做的发送方法
  1. protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
  2. throws JMSException {
  3. Assert.notNull(messageCreator, "MessageCreator must not be null");
  4. // 创建消息生产者
  5. MessageProducer producer = createProducer(session, destination);
  6. try {
  7. // 创建消息
  8. Message message = messageCreator.createMessage(session);
  9. if (logger.isDebugEnabled()) {
  10. logger.debug("Sending created message: " + message);
  11. }
  12. // 发送
  13. doSend(producer, message);
  14. // Check commit - avoid commit call within a JTA transaction.
  15. if (session.getTransacted() && isSessionLocallyTransacted(session)) {
  16. // Transacted session created by this template -> commit.
  17. JmsUtils.commitIfNecessary(session);
  18. }
  19. } finally {
  20. // 关闭消息生产者
  21. JmsUtils.closeMessageProducer(producer);
  22. }
  23. }
  1. createProducer中通过javax.jms.Session.createProducer创建MessageProducer,第三方消息中间件独立实现
  2. createMessage
  1. @Override
  2. public javax.jms.Message createMessage(Session session) throws JMSException {
  3. try {
  4. // 消息转换
  5. return this.messageConverter.toMessage(this.message, session);
  6. } catch (Exception ex) {
  7. throw new MessageConversionException("Could not convert '" + this.message + "'", ex);
  8. }
  9. }
  • 消息转换后续在更新
  1. doSend 这里也是第三方消息中间件实现
  1. protected void doSend(MessageProducer producer, Message message) throws JMSException {
  2. if (this.deliveryDelay >= 0) {
  3. producer.setDeliveryDelay(this.deliveryDelay);
  4. }
  5. if (isExplicitQosEnabled()) {
  6. // 发送消息,第三方消息中间件实现
  7. producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
  8. } else {
  9. producer.send(message);
  10. }
  11. }
  1. closeMessageProducer 这个方法特别,直接关闭
  1. public static void closeMessageProducer(@Nullable MessageProducer producer) {
  2. if (producer != null) {
  3. try {
  4. producer.close();
  5. } catch (JMSException ex) {
  6. logger.trace("Could not close JMS MessageProducer", ex);
  7. } catch (Throwable ex) {
  8. // We don't trust the JMS provider: It might throw RuntimeException or Error.
  9. logger.trace("Unexpected exception on closing JMS MessageProducer", ex);
  10. }
  11. }
  12. }

receive 接收消息

  1. @Override
  2. @Nullable
  3. public Message receive(String destinationName) throws JmsException {
  4. return receiveSelected(destinationName, null);
  5. }
  6. @Override
  7. @Nullable
  8. public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
  9. return execute(session -> {
  10. Destination destination = resolveDestinationName(session, destinationName);
  11. return doReceive(session, destination, messageSelector);
  12. }, true);
  13. }
  14. @Nullable
  15. protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
  16. throws JMSException {
  17. return doReceive(session, createConsumer(session, destination, messageSelector));
  18. }
  19. @Nullable
  20. protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
  21. try {
  22. // Use transaction timeout (if available).
  23. long timeout = getReceiveTimeout();
  24. // 链接工厂
  25. ConnectionFactory connectionFactory = getConnectionFactory();
  26. // JMS 资源信息
  27. JmsResourceHolder resourceHolder = null;
  28. if (connectionFactory != null) {
  29. // 从连接对象中获取JMS 资源信息
  30. resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
  31. }
  32. if (resourceHolder != null && resourceHolder.hasTimeout()) {
  33. // 超时时间
  34. timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
  35. }
  36. // 具体的消息
  37. Message message = receiveFromConsumer(consumer, timeout);
  38. if (session.getTransacted()) {
  39. // 事务性操作
  40. // Commit necessary - but avoid commit call within a JTA transaction.
  41. if (isSessionLocallyTransacted(session)) {
  42. // Transacted session created by this template -> commit.
  43. JmsUtils.commitIfNecessary(session);
  44. }
  45. } else if (isClientAcknowledge(session)) {
  46. // Manually acknowledge message, if any.
  47. if (message != null) {
  48. message.acknowledge();
  49. }
  50. }
  51. return message;
  52. } finally {
  53. JmsUtils.closeMessageConsumer(consumer);
  54. }
  55. }