Artemis采取了不同的方法。它只在内部实现队列,所有消息传送概念都是通过使用消息地址路由到适当的队列来实现的。如:发布订阅(topics)和点对点(queues)之类的消息传递概念是使用不同类型的路由机制实现。广播(Multicast)路由用于实现发布订阅(publish-subscribe)规程,所有订阅者将通过路由将获得自己的内部队列和消息。选播(Anycast)路由用于实现点对点规程,其中每一个地址只有一个队列,所有消费者都订阅它。所有协议都使用寻址和路由方案。例如,您可以将JMS topic作为广播地址,用于发送给消费者。

消息队列

点对点消息(queue):

通常将消息发送到队列,队列将消息持久化后,将消息推送给多个消费者,但只有其中一个能够接收成功。如果生产者在生产消息的时候,并无消费者在线,那么会将消息进行持久化,待消费者上线后,将消息发送给消费者。当消费者接收消息成功后,会隐式地发送一个ack给消息中间件,消息中间件收到ack后将消息从消息队列中标记为已消费并从队列中移除。

发布订阅(topic):

将消息发送到称为Topic的实体上,一个Topic可以有许多消费者(订阅者),每个订阅者都能收到发布者(生产者)的消息。生产订阅消息分为持久性订阅和非持久性订阅。
持久性订阅会将消息进行持久化而保存每一条消息,直到所有订阅者都收到了它们。
非持久订阅仅持续到创建它们的连接的最长生命周期。
消息传递可靠性,消息的可靠性有三种传输保障:

  • At most once,至多一次,消息可能丢失但是不会重复发送;
  • At least once,至少一次,消息不会丢失,但是可能会重复;
  • Exactly once,精确一次,每条消息肯定会被传输一次且仅一次。

事务消息:
事务消息指的是针对事务开始和事务结束期间的全体操作,要不全部成功,要不全部失败。Artemis支持本地的多个消息的发送和确认事务。
消息持久化:
需要持久化的消息存储在磁盘中,不需要持久化的消息存储在内存中。

使用Artemis

首先我们我们下载并安装了broker,我们就会遇到与ActiveMQ第一个不同之处。使用Artemis,您需要显式地创建一个broker实例,而在ActiveMQ上,这个步骤是可选的。这个步骤的意思是将broker的安装和配置分开,这使得以后升级和维护代理更加容易。
所以,在启动Artemis之前,你需要执行这样的操作:

  1. $ bin/artemis create clown /data/clown

不管你在什么操作系统上安装broker二进制文件,broker实例都会位于像“/opt/artemis”目录中。对每个ActiveMQ用户都熟悉这个文件夹的内容:

  • bin – 包含管理broker的shell脚本(start, stop, etc.)
  • data – 每一个broker数据存储 (message store)
  • etc – 包含broker配置文件(it’s what conf directory is in ActiveMQ)
  • log – Artemis logs存储目录, 而不像ActiveMQ将日志保存在data目录中
  • tmp – 临时文件目录

现在让我们更详细地了解一下配置。

  • etc/bootstrap.xml
    • 文件用于设置基本信息,如:主broker配置文件的位置、实用程序(如Web服务器)和JAAS安全性。
  • etc/broker.xml
    • 与ActiveMQ’s中的conf/activemq.xml类似,用于配置broker的大部分信息,如:端口、名称、安全策略等。
  • etc/artemis.profile
    • 文件与ActiveMQ中的bin/env文件类似。您可以在这个文件中配置broker环境变量,主要:SSL、调试等相关的JVM常规参数。
  • etc/logging.properties
    • 两个brokers之间的日志配置没有多大差别,所以任何熟悉Java日志记录系统的人都会配置这个文件。

最后,我们有JAAS配置文件 (login.config, artemis-users.properties and artemis-roles.properties),它们与ActiveMQ中的配置相同。
现在我们可以启动Artemis:

  1. $ bin/artemis run

Spring配置Artemis

maven配置

  1. <!-- https://mvnrepository.com/artifact/org.apache.activemq/artemis-jms-client -->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>artemis-jms-client</artifactId>
  5. <version>2.7.0</version>
  6. </dependency>

Java配置

  1. private ActiveMQConnectionFactory factory;
  2. private Connection connection;
  3. @Autowired
  4. private ActiveMQQueueListener listener;
  5. public ActiveMQConfig() {
  6. factory = new ActiveMQConnectionFactory(tcp_uri);
  7. factory.setCacheDestinations(true);
  8. try {
  9. connection = factory.createConnection();
  10. connection.start();
  11. } catch (JMSException e) {
  12. e.printStackTrace();
  13. }
  14. }

首先我们需要一个ActiveMQ的连接器工厂类,然后还需要连接到ActiveMQ。

  1. @Bean(destroyMethod = "close")
  2. public Session activeMQSession() throws JMSException {
  3. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  4. return session;
  5. }
  6. /**
  7. * Queue生产者
  8. *
  9. * @param session
  10. * @return
  11. * @throws JMSException
  12. */
  13. @Bean
  14. public MessageProducer messageProducer(Session session) throws JMSException {
  15. if (mqType == 0) {
  16. return session.createProducer(session.createQueue(QUEUE_DEFAULT));
  17. }
  18. return session.createProducer(session.createTopic(TOPIC_DEFAULT));
  19. }

这里我们需要创建一个生产者(MessageProducer),mqType是一个int对象,如:private int mqType = 1;//0:queue|1:topic|2:mqtt
这里为了做实验,便于切换,但是在实际应用场合中,我们应该是既使用Queue也会使用Topic,我将会在最后贴出ActiveMQService服务类,此类实现了对Artemis的基本操作。

  1. /**
  2. * Queue消费者
  3. *
  4. * @param session
  5. * @return
  6. * @throws JMSException
  7. */
  8. @Bean
  9. public MessageConsumer messageConsumer(Session session) throws JMSException {
  10. MessageConsumer consumer = null;
  11. if (mqType == 0) {
  12. consumer = session.createConsumer(session.createQueue(QUEUE_DEFAULT));
  13. } else {
  14. consumer = session.createConsumer(session.createTopic(TOPIC_DEFAULT));
  15. }
  16. consumer.setMessageListener(listener);
  17. return consumer;
  18. }

最后我们只需要创建一个消费者(MessageConsumer),配置文件非常简单,现在可以用来测试。

  1. @RestController
  2. @RequestMapping(value = "/test")
  3. public class TestActiveMQController {
  4. @Autowired
  5. private IActiveMQProducerService producerService;
  6. @RequestMapping(value = "/activemq/queue")
  7. public String testQueue(HttpServletRequest request) {
  8. String str = "test queue " + DateTimeUtils.getTimeInt();
  9. producerService.send(str);
  10. return str;
  11. }
  12. @RequestMapping(value = "/activemq/mqtt")
  13. public String testMqtt(HttpServletRequest request) {
  14. String str = "test mqtt " + DateTimeUtils.getTimeInt();
  15. producerService.push(str);
  16. return str;
  17. }
  18. }
  19. //生产者服务类
  20. @Service
  21. public class ActiveMQProducerServiceImpl extends CompactService implements IActiveMQProducerService {
  22. @Autowired
  23. private MessageProducer producer;
  24. @Autowired
  25. private Session session;
  26. @Autowired
  27. private BlockingConnection blockingConnection;
  28. public ActiveMQProducerServiceImpl() {
  29. }
  30. /**
  31. * Queue & Topic send
  32. *
  33. * @param message
  34. */
  35. @Override
  36. public void send(String message) {
  37. try {
  38. producer.send(session.createTextMessage(message));
  39. } catch (JMSException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. /**
  44. * MQTT push
  45. *
  46. * @param message
  47. */
  48. @Override
  49. public void push(String message) {
  50. try {
  51. blockingConnection.publish(ActiveMQConfig.MQTT_DEFAULT, message.getBytes(), QoS.AT_LEAST_ONCE, false);
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }

为了测试,随便整合了一下,但是后来觉得,其实没有必要使用config的方式进行整合,以为Apache ActiveMQ Artemis是一个非常大的消息队列,应该以服务层的概念来整合进去,所以后来写了一个Artemis服务对象类:

整合到服务层

接口IActiveMQService

  1. import javax.jms.*;
  2. public interface IActiveMQService {
  3. Session createSession() throws JMSException;
  4. Queue createQueue(Session session, String name) throws JMSException;
  5. Topic createTopic(Session session, String name) throws JMSException;
  6. MessageProducer getMessageProducer(Queue queue) throws JMSException;
  7. MessageProducer getMessageProducer(Topic topic) throws JMSException;
  8. MessageProducer createMessageProducer(Destination destination, String name) throws JMSException;
  9. MessageConsumer getMessageConsumer(Session session, Queue queue) throws JMSException;
  10. MessageConsumer getMessageConsumer(Session session, Topic topic) throws JMSException;
  11. MessageConsumer createMessageConsumer(Session session, Destination destination, String name) throws JMSException;
  12. boolean send(String name, String type, String message);
  13. boolean send(String name, String type, String message, CompletionListener listener);
  14. boolean send(MessageProducer producer, String message);
  15. boolean send(MessageProducer producer, String message, CompletionListener listener);
  16. void addListener(String name, String type, MessageListener listener);
  17. }

实现ActiveMQService:

  1. import com.lanxinbase.system.basic.CompactService;
  2. import com.lanxinbase.system.service.resource.IActiveMQService;
  3. import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
  4. import org.springframework.beans.factory.DisposableBean;
  5. import org.springframework.beans.factory.InitializingBean;
  6. import org.springframework.stereotype.Service;
  7. import javax.jms.*;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. @Service
  11. public class ActiveMQService extends CompactService implements IActiveMQService, InitializingBean, DisposableBean {
  12. public static final String TYPE_QUEUE = "queue";
  13. public static final String TYPE_TOPIC = "topic";
  14. private static final String TOPIC_DEFAULT = "topic/default/";
  15. private static final String QUEUE_DEFAULT = "queue/default/";
  16. private static final String tcp_uri = "tcp://0.0.0.0:61616";
  17. private ActiveMQConnectionFactory factory;
  18. private Connection connection;
  19. private Session session;
  20. private Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();
  21. public ActiveMQService() {
  22. }
  23. /**
  24. * 首次启动运行
  25. *
  26. * @throws Exception
  27. */
  28. @Override
  29. public void afterPropertiesSet() throws Exception {
  30. factory = new ActiveMQConnectionFactory(tcp_uri);
  31. factory.setCacheDestinations(true);
  32. connection = factory.createConnection();
  33. connection.start();
  34. createSession();
  35. }
  36. /**
  37. * 私有的session,主要用于生产者
  38. *
  39. * @return
  40. * @throws JMSException
  41. */
  42. private Session getSession() throws JMSException {
  43. if (session == null) {
  44. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  45. }
  46. return session;
  47. }
  48. /**
  49. * 创建一个新的session,主要应用于消费者
  50. *
  51. * @return
  52. * @throws JMSException
  53. */
  54. @Override
  55. public Session createSession() throws JMSException {
  56. return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  57. }
  58. /**
  59. * 创建一个Queue
  60. *
  61. * @param s session
  62. * @param name 如:test
  63. * @return
  64. * @throws JMSException
  65. */
  66. @Override
  67. public Queue createQueue(Session s, String name) throws JMSException {
  68. if (name == null) {
  69. name = "";
  70. }
  71. return s.createQueue(QUEUE_DEFAULT + name);
  72. }
  73. /**
  74. * 创建一个Topic
  75. *
  76. * @param s session
  77. * @param name 如:test
  78. * @return
  79. * @throws JMSException
  80. */
  81. @Override
  82. public Topic createTopic(Session s, String name) throws JMSException {
  83. if (name == null) {
  84. name = "";
  85. }
  86. return s.createTopic(TOPIC_DEFAULT + name);
  87. }
  88. /**
  89. * 获取一个生产者
  90. *
  91. * @param queue
  92. * @return
  93. * @throws JMSException
  94. */
  95. @Override
  96. public MessageProducer getMessageProducer(Queue queue) throws JMSException {
  97. return createMessageProducer(queue, queue.getQueueName());
  98. }
  99. /**
  100. * 获取一个生产者
  101. *
  102. * @param topic
  103. * @return
  104. * @throws JMSException
  105. */
  106. @Override
  107. public MessageProducer getMessageProducer(Topic topic) throws JMSException {
  108. return createMessageProducer(topic, topic.getTopicName());
  109. }
  110. /**
  111. * 创建一个生产者
  112. *
  113. * @param destination
  114. * @param name 缓存key,同时等于queue&topic的name
  115. * @return
  116. * @throws JMSException
  117. */
  118. @Override
  119. public MessageProducer createMessageProducer(Destination destination, String name) throws JMSException {
  120. logger(name);
  121. MessageProducer producer = producerMap.get(name);
  122. if (producer == null) {
  123. producer = session.createProducer(destination);
  124. producerMap.put(name, producer);
  125. }
  126. return producer;
  127. }
  128. /**
  129. * 获取一个消费者
  130. *
  131. * @param s 可以通过createSession创建
  132. * @param queue
  133. * @return
  134. * @throws JMSException
  135. */
  136. @Override
  137. public MessageConsumer getMessageConsumer(Session s, Queue queue) throws JMSException {
  138. return createMessageConsumer(s, queue, queue.getQueueName());
  139. }
  140. /**
  141. * 获取一个消费者
  142. *
  143. * @param s 可以通过createSession创建
  144. * @param topic
  145. * @return
  146. * @throws JMSException
  147. */
  148. @Override
  149. public MessageConsumer getMessageConsumer(Session s, Topic topic) throws JMSException {
  150. return createMessageConsumer(s, topic, topic.getTopicName());
  151. }
  152. /**
  153. * 创建一个消费者
  154. *
  155. * @param s session(本来是把session做成单例,但是消费者应该是动态的,不同于生产者,所以这里需要随时创建一个session)
  156. * @param destination
  157. * @param name 废弃
  158. * @return
  159. * @throws JMSException
  160. */
  161. @Override
  162. public MessageConsumer createMessageConsumer(Session s, Destination destination, String name) throws JMSException {
  163. // MessageConsumer consumer = consumerMap.get(name);
  164. MessageConsumer consumer = null;
  165. // if (consumer == null) {
  166. // consumer = session.createConsumer(destination);
  167. // consumerMap.put(name, consumer);
  168. // }
  169. consumer = s.createConsumer(destination);
  170. return consumer;
  171. }
  172. /**
  173. * 生产一条消息
  174. *
  175. * @param name Queue|Topic的name
  176. * @param type 类型:Queue|Topic
  177. * @param message 字符串消息,通常应该是JSON字符串
  178. * @return
  179. */
  180. @Override
  181. public boolean send(String name, String type, String message) {
  182. return this.send(name, type, message, null);
  183. }
  184. @Override
  185. public boolean send(String name, String type, String message, CompletionListener listener) {
  186. MessageProducer producer;
  187. try {
  188. if (TYPE_QUEUE.equals(type)) {
  189. producer = getMessageProducer(createQueue(getSession(), name));
  190. } else {
  191. producer = getMessageProducer(createTopic(getSession(), name));
  192. }
  193. if (listener == null) {
  194. return this.send(producer, message);
  195. } else {
  196. return this.send(producer, message, listener);
  197. }
  198. } catch (JMSException e) {
  199. e.printStackTrace();
  200. }
  201. return false;
  202. }
  203. /**
  204. * 发送消息
  205. *
  206. * @param producer 生产者
  207. * @param message 消息字符串
  208. * @return
  209. */
  210. @Override
  211. public boolean send(MessageProducer producer, String message) {
  212. try {
  213. producer.send(session.createTextMessage(message));
  214. return true;
  215. } catch (JMSException e) {
  216. e.printStackTrace();
  217. }
  218. return false;
  219. }
  220. @Override
  221. public boolean send(MessageProducer producer, String message, CompletionListener listener) {
  222. try {
  223. producer.send(session.createTextMessage(message), listener);
  224. return true;
  225. } catch (JMSException e) {
  226. e.printStackTrace();
  227. }
  228. return false;
  229. }
  230. /**
  231. * 消费者监听
  232. *
  233. * @param name Queue|Topic的name
  234. * @param type 类型:Queue|Topic
  235. * @param listener 监听回调
  236. */
  237. @Override
  238. public void addListener(String name, String type, MessageListener listener) {
  239. MessageConsumer consumer = null;
  240. try {
  241. Session session = createSession();
  242. if (TYPE_QUEUE.equals(type)) {
  243. consumer = getMessageConsumer(session, createQueue(session, name));
  244. } else {
  245. consumer = getMessageConsumer(session, createTopic(session, name));
  246. }
  247. this.addListener(consumer, listener);
  248. } catch (JMSException e) {
  249. e.printStackTrace();
  250. }
  251. }
  252. private void addListener(MessageConsumer consumer, MessageListener listener) {
  253. try {
  254. consumer.setMessageListener(listener);
  255. } catch (JMSException e) {
  256. e.printStackTrace();
  257. }
  258. }
  259. /**
  260. * 程序退出时需要关闭或停止连接
  261. *
  262. * @throws Exception
  263. */
  264. @Override
  265. public void destroy() throws Exception {
  266. if (session != null) {
  267. session.close();
  268. }
  269. connection.stop();
  270. connection.close();
  271. factory.close();
  272. }
  273. }

由于每个方法都加了注释,所以这里就不更多的废话了。但是在public MessageConsumer createMessageConsumer()方法中我使用的是创建了一个全新的Session来创建消费者,为什么要这样子做呢?
在生产环境中应该跟createMessageProducer()方法中的session一样使用单列的session,这里需要注意就是这个session我并没有研究过有没有过期时间,在生产环境中,需要实时传输消息,理论上是不会过期。
之所以这样做,我认为在用户登陆APP上线之后会执行一个会话(创建一个新的session),当APP下线之后也应该移除这个Session,示例代码只是为了做测试,所以没有过多的实现,在实际应用中应该根据自身的业务需求来改进逻辑代码。

最后测试TestActiveMQController:

  1. @RestController
  2. @RequestMapping(value = "/test")
  3. public class TestActiveMQController {
  4. private static final Logger logger = Logger.getLogger("TestActiveMQController>");
  5. @Resource
  6. private IActiveMQService activeMQService;
  7. /**
  8. * /test/activemq/pub/add?name=testQueue&type=queue
  9. * /test/activemq/pub/add?name=testQueue1&type=queue
  10. * <p>
  11. * /test/activemq/pub/add?name=testTopic&type=topic
  12. * /test/activemq/pub/add?name=testTopic1&type=topic
  13. *
  14. * @param request
  15. * @return
  16. */
  17. @RequestMapping(value = "/activemq/pub/add")
  18. public ResultResp<Void> pubAdd(HttpServletRequest request) {
  19. ResultResp<Void> resp = new ResultResp<>();
  20. String name = request.getParameter("name");
  21. String type = request.getParameter("type");
  22. String msg = "test mqtt " + DateTimeUtils.getTimeInt();
  23. activeMQService.send(name, type, msg);
  24. resp.setInfo(msg);
  25. return resp;
  26. }
  27. /**
  28. * /test/activemq/sub/add?id=100&name=testQueue&type=queue
  29. * /test/activemq/sub/add?id=101&name=testQueue1&type=queue
  30. * <p>
  31. * /test/activemq/sub/add?id=100&name=testTopic&type=topic
  32. * /test/activemq/sub/add?id=101&name=testTopic&type=topic
  33. *
  34. * @param request
  35. * @return
  36. */
  37. @RequestMapping(value = "/activemq/sub/add")
  38. public ResultResp<Void> subAdd(HttpServletRequest request) {
  39. ResultResp<Void> resp = new ResultResp<>();
  40. String id = request.getParameter("id");
  41. String name = request.getParameter("name");
  42. String type = request.getParameter("type");
  43. activeMQService.addListener(name, type, (m) -> {
  44. TextMessage textMessage = (TextMessage) m;
  45. try {
  46. logger.info(id + " : " + textMessage.getText());
  47. } catch (JMSException e) {
  48. e.printStackTrace();
  49. }
  50. });
  51. return resp;
  52. }
  53. }

SpringBoot配置Artemis

POM文件

  1. // 引入activemq
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>
  6. // 引入artemis
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-artemis</artifactId>
  10. </dependency>

配置文件

配置activemq

  1. spring:
  2. activemq:
  3. broker-url: tcp://${ACTIVEMQ_HOST, localhost}:61616 # activemq连接地址
  4. user: ${ACTIVEMQ_USER, admin} # 用户名
  5. password: ${ACTIVEMQ_PASSWORD, admin} # 密码
  6. send-timeout: # 发送超时时间
  7. pool:
  8. enabled: false # 是否创建 JmsPoolConnectionFactory 连接池
  9. idle-timeout: 30s # 空闲连接超时时间
  10. max-connections: 50 # 连接池中最大连接数
  11. max-sessions-per-connection: 100 # 每个连接最大会话

配置artemis

  1. spring:
  2. artemis:
  3. mode: native
  4. host: ${ARTEMIS_HOST, localhost} # artermis连接地址
  5. port: ${ARTEMIS_PORT, 9876} # artermis连接端口
  6. user: ${ARTEMIS_USER, admin} # 用户名
  7. password: ${ARTEMIS_PASSWORD, admin} # 密码
  8. send-timeout: # 发送超时时间
  9. pool:
  10. enabled: false # 是否创建 JmsPoolConnectionFactory 连接池
  11. idle-timeout: 30s # 空闲连接超时时间
  12. max-connections: 50 # 连接池中最大连接数
  13. max-sessions-per-connection: 100 # 每个连接最大会话

Spring JMS 中默使用 CachingConnectionFactory 创建连接池,如果指定 JmsPoolConnectionFactory 连接池,测在 spring.jms.* 中配置连接池属性。

  1. spring:
  2. jms:
  3. session-cache-size: 5
  4. # 是否允许发布订阅
  5. pub-sub-domain: true

关于 Activemq (Artemis) 和 Apring Jms 更多配置,可参考:Spring Boot Integration Properties

使用

JmsTemplate 是 Spring JMS 中用来发送消息的类。做完以上配置,在 Spring Boot 启动入口加上 @EnableJms 注解,项目在启动时会自动装配,在项目中直接注入 JmsTemplate 对象。

  1. @SpringBootApplication
  2. @EnableJms
  3. public class Application {
  4. @Autowired
  5. private JmsTemplate jmsTemplate;
  6. public static void main(String[] args) {
  7. SpringApplication.run(Application.class);
  8. }
  9. }

发送文本消息示例:

  1. public void send(String msg) {
  2. log.info("发送消息:{}", msg);
  3. jmsTemplate.send(DEST_NAME, new MessageCreator() {
  4. @Override
  5. public Message createMessage(Session session) throws JMSException {
  6. // 也可以创建对象 session.createObjectMessage()
  7. TextMessage textMessage = session.createTextMessage();
  8. textMessage.setText(msg);
  9. return textMessage;
  10. }
  11. });
  12. }

接收使用 @JmsListener 注解:

  1. /**
  2. * 监听消息
  3. * @param content
  4. */
  5. @JmsListener(destination = DEST_NAME, concurrency = "消费线程数")
  6. public void recive(String content) {
  7. log.info("收到消息:{}", content);
  8. }

发送消息和接收消息的 DEST_NAME 要保持一至。 concurrency 属性非必须,用来设置消费消息的程序数。关于 ActiveMQ 的详细介绍会在专门的章节讲解。

Producer

  1. @Component
  2. public class Producer {
  3. @Autowired
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. public void send(Destination destination, final String message) {
  6. jmsMessagingTemplate.convertAndSend(destination, message + "from queue");
  7. }
  8. @JmsListener(destination="out.queue")
  9. public void consumerMessage(String text){
  10. System.out.println("从out.queue队列收到的回复信息为:"+text);
  11. }
  12. }

Consumer

  1. @Component
  2. public class Consumer {
  3. @JmsListener(destination = "mytest.queue")
  4. @SendTo("out.queue")
  5. public String receiveQueue(String text) {
  6. System.out.println("Consumer收到的信息为:"+text);
  7. return "return message "+text;
  8. }
  9. }

Rest使用

  1. @ResponseBody
  2. @RequestMapping(value = "/mqtest", method = RequestMethod.GET)
  3. public Object mqtest() {
  4. Destination destination = new ActiveMQQueue("mytest.queue");
  5. producer.send(destination, "I am YeJiaWei");
  6. return "OK";
  7. }