下载部署

下载地址:

[http://activemq.apache.org/components/classic/download/](http://activemq.apache.org/components/classic/download/)

启动

点击install/bin/win64/activemq.bat启动,如下图启动成功。
消息队列ActiveMQ - 图1
监控ActiveMQ的admin应用地址: [http://127.0.0.1:8161/admin/](http://127.0.0.1:8161/admin/)默认账号密码:admin/admin

界面

主页面
消息队列ActiveMQ - 图2
消息队列界面消息队列ActiveMQ - 图3
Name:消息队列的名称。
Number Of Pending Messages:未被消费的消息数目。
Number Of Consumers:消费者的数量。
Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。

使用

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. <scope>test</scope>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.activemq</groupId>
  12. <artifactId>activemq-all</artifactId>
  13. <version>5.16.0</version>
  14. </dependency>

点对点(P2P)模型

点对点模型,采用的是队列(Queue)作为消息载体。在该模式中,一条消息只能被一个消费者消费,没有被消费的,只能留在队列中,等待被消费,或者超时。举个例子,如果队列中有10条消息,有两个消费者,就是一个消费者消费5条信息,你一条我一条。以下以代码演示。

点对点-单线程

消息生产者P2pProducter

  1. public class P2pProducter {
  2. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  3. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  4. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  5. public static void main(String[] args) throws JMSException {
  6. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
  7. PASSWORD,BROKER_URL);
  8. Connection connection = connectionFactory.createConnection();
  9. connection.start();
  10. Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
  11. Destination destination = session.createQueue("queue");
  12. MessageProducer producer = session.createProducer(destination);
  13. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  14. for (int i=0;i<=5;i++) {
  15. TextMessage textMessage = session.createTextMessage();
  16. textMessage.setText("我是第"+i+"消息");
  17. producer.send(textMessage);
  18. }
  19. if(connection!=null){
  20. connection.close();
  21. }
  22. }
  23. }

消息消费者

  1. public class P2pConsumer {
  2. //连接信息
  3. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  4. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  5. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  6. public static void main(String[] args) throws JMSException {
  7. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
  8. PASSWORD,BROKER_URL);
  9. Connection connection = connectionFactory.createConnection();
  10. connection.start();
  11. Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
  12. Destination destination = session.createQueue("queue");
  13. MessageConsumer consumer = session.createConsumer(destination);
  14. while (true){
  15. TextMessage message = (TextMessage) consumer.receive();
  16. if (message==null){
  17. break;
  18. }
  19. System.out.println(message.getText());
  20. }
  21. if(connection!=null){
  22. connection.close();
  23. }
  24. }
  25. }

先启动消息生产者,再启动消息消费者
image.png

点对点消息消费步骤

实现步骤
1.建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
2.通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
3.通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
4.通过Session对象创建Destination对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
5.通过Session对象创建消息的发送和接收对象(生产者和消费者)
6.通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
7.使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭

点对点-多线程

在这里通过多线程生产和消费

生产者Producter

  1. package cn.javabb.mq.activemq.p2pMult;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.activemq.ActiveMQConnection;
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import javax.jms.*;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /**
  8. * @desc:
  9. * @author: javabb (javabob(a)163.com)
  10. * @create: 2021/06/22 20:22
  11. */
  12. @Slf4j
  13. public class Producter {
  14. //连接信息
  15. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  16. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  17. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  18. //提供原子操作的Integer 提供安全操作+- 适合并发系统
  19. AtomicInteger count = new AtomicInteger(0);
  20. // 链接工厂
  21. ConnectionFactory connectionFactory;
  22. // 链接对象
  23. Connection connection;
  24. // 事务管理
  25. Session session;
  26. // 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改
  27. ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
  28. public void init() {
  29. try {
  30. System.out.println(USERNAME + "," + PASSWORD + "," + BROKER_URL);
  31. // 创建链接工厂
  32. connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
  33. // 从工厂中创建一个链接
  34. connection = connectionFactory.createConnection();
  35. connection.start();
  36. // 创建一个事务
  37. session = connection.createSession(true, Session.SESSION_TRANSACTED);
  38. } catch (JMSException e) {
  39. // TODO Auto-generated catch block
  40. e.printStackTrace();
  41. }
  42. }
  43. public void sendMessage(String disname) {
  44. try {
  45. Queue queue = session.createQueue(disname);
  46. MessageProducer messageProducer = null;
  47. if (threadLocal.get() != null) {
  48. System.out.println("当前threadLocal不为空");
  49. messageProducer = threadLocal.get();
  50. } else {
  51. messageProducer = session.createProducer(queue);
  52. threadLocal.set(messageProducer);
  53. }
  54. while (true) {
  55. Thread.sleep(1000);
  56. int num = count.getAndIncrement();
  57. //创建一条消息
  58. TextMessage msg = session.createTextMessage(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);
  59. System.out.println(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);
  60. messageProducer.send(msg);
  61. session.commit();
  62. }
  63. } catch (JMSException | InterruptedException e) {
  64. // TODO Auto-generated catch block
  65. e.printStackTrace();
  66. }
  67. }
  68. }

消费者

  1. package cn.javabb.mq.queue;
  2. import org.apache.activemq.ActiveMQConnection;
  3. import org.apache.activemq.ActiveMQConnectionFactory;
  4. import javax.jms.*;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. /**
  7. * @desc:
  8. * @author: javabb (javabob(a)163.com)
  9. * @create: 2020/09/15 18:31
  10. */
  11. public class Comsumer {
  12. //连接信息
  13. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  14. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  15. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  16. //提供原子操作的Integer 提供安全操作+- 适合并发系统
  17. AtomicInteger count = new AtomicInteger(0);
  18. // 链接工厂
  19. ConnectionFactory connectionFactory;
  20. // 链接对象
  21. Connection connection;
  22. // 事务管理
  23. Session session;
  24. // 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改
  25. ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
  26. public void init() {
  27. try {
  28. // 创建链接工厂
  29. connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
  30. // 从工厂中创建一个链接
  31. connection = connectionFactory.createConnection();
  32. connection.start();
  33. // 创建一个事务
  34. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  35. } catch (JMSException e) {
  36. // TODO Auto-generated catch block
  37. e.printStackTrace();
  38. }
  39. }
  40. public void getMessage(String disname) {
  41. try {
  42. Queue queue = session.createQueue(disname);
  43. MessageConsumer consumer = null;
  44. if (threadLocal.get() != null) {
  45. consumer = threadLocal.get();
  46. } else {
  47. consumer = session.createConsumer(queue);
  48. threadLocal.set(consumer);
  49. }
  50. while (true) {
  51. Thread.sleep(1000);
  52. TextMessage msg = (TextMessage) consumer.receive();
  53. if (msg != null) {
  54. msg.acknowledge();
  55. System.out.println(Thread.currentThread().getName() + ": Consumer:我是消费者,我正在消费Msg:" + msg.getText() + "--->" + count.getAndIncrement());
  56. } else {
  57. break;
  58. }
  59. }
  60. } catch (JMSException | InterruptedException e) {
  61. // TODO Auto-generated catch block
  62. e.printStackTrace();
  63. }
  64. }
  65. }

启动生产者生产消息

  1. package cn.javabb.mq.activemq.p2pMult;
  2. /**
  3. * @desc:
  4. * @author: javabb (javabob(a)163.com)
  5. * @create: 2021/06/22 20:24
  6. */
  7. public class TestMultProducter {
  8. public static void main(String[] args) {
  9. Producter producter = new Producter();
  10. producter.init();
  11. TestMultProducter testProductor = new TestMultProducter();
  12. try {
  13. Thread.sleep(1000);
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. //new Thread()
  18. new Thread(testProductor.new ProductorMq(producter)).start();
  19. new Thread(testProductor.new ProductorMq(producter)).start();
  20. new Thread(testProductor.new ProductorMq(producter)).start();
  21. }
  22. class ProductorMq implements Runnable {
  23. Producter producter;
  24. public ProductorMq(Producter producter) {
  25. this.producter = producter;
  26. }
  27. @Override
  28. public void run() {
  29. while (true) {
  30. try {
  31. producter.sendMessage("javabb-demo-activemq");
  32. Thread.sleep(10000);
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }
  39. }

消费者消费

  1. package cn.javabb.mq.activemq.p2pMult;
  2. /**
  3. * @desc:
  4. * @author: javabb (javabob(a)163.com)
  5. * @create: 2021/06/22 20:25
  6. */
  7. public class TestMultConsumer {
  8. public static void main(String[] args) {
  9. Consumer consumer = new Consumer();
  10. consumer.init();
  11. TestMultConsumer testConsumer = new TestMultConsumer();
  12. new Thread(testConsumer.new ConsumerMq(consumer)).start();
  13. new Thread(testConsumer.new ConsumerMq(consumer)).start();
  14. new Thread(testConsumer.new ConsumerMq(consumer)).start();
  15. }
  16. class ConsumerMq implements Runnable {
  17. Consumer consumer;
  18. public ConsumerMq(Consumer consumer) {
  19. this.consumer = consumer;
  20. }
  21. @Override
  22. public void run() {
  23. while (true) {
  24. try {
  25. consumer.getMessage("javabb-demo-activemq");
  26. Thread.sleep(10000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. }
  33. }

同时启动生产者消费者,每个生产者和消费都同时启动了3个进程,同时生产和消费。

image.png

发布订阅(P2S)模型

发布/订阅模型采用的是主题(Topic)作为消息通讯载体。该模式类似微信公众号的模式。发布者发布一条信息,然后将该信息传递给所有的订阅者。注意:订阅者想要接收到该信息,必须在该信息发布之前订阅。
发布者P2sPublish

  1. package cn.javabb.mq.activemq.p2s;
  2. import org.apache.activemq.ActiveMQConnection;
  3. import org.apache.activemq.ActiveMQConnectionFactory;
  4. import javax.jms.*;
  5. import java.io.IOException;
  6. /**
  7. * @desc:
  8. * @author: javabb (javabob(a)163.com)
  9. * @create: 2021/06/22 20:35
  10. */
  11. public class P2sPublish {
  12. //连接信息
  13. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  14. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  15. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  16. public static void main(String[] args) throws JMSException, IOException {
  17. // 创建一个ConnectionFactory对象连接MQ服务器
  18. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
  19. PASSWORD,BROKER_URL);
  20. // 创建一个连接对象
  21. Connection connection;
  22. connection = connectionFactory.createConnection();
  23. // 开启连接
  24. connection.start();
  25. // 使用Connection对象创建一个Session对象
  26. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  27. // 创建一个Destination对象。topic对象
  28. Topic topic = session.createTopic("test-topic");
  29. // 使用Session对象创建一个消费者对象。
  30. MessageConsumer consumer = session.createConsumer(topic);
  31. // 接收消息
  32. consumer.setMessageListener(new MessageListener() {
  33. @Override
  34. public void onMessage(Message message) {
  35. // 打印结果
  36. TextMessage textMessage = (TextMessage) message;
  37. String text;
  38. try {
  39. text = textMessage.getText();
  40. System.out.println("这是接收到的消息:" + text);
  41. } catch (JMSException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. });
  46. System.out.println("topic消费者启动。。。。");
  47. // 等待接收消息
  48. System.in.read();
  49. // 关闭资源
  50. consumer.close();
  51. session.close();
  52. connection.close();
  53. }
  54. }

订阅者P2sSubscription

  1. package cn.javabb.mq.activemq.p2s;
  2. import org.apache.activemq.ActiveMQConnection;
  3. import org.apache.activemq.ActiveMQConnectionFactory;
  4. import javax.jms.*;
  5. /**
  6. * @desc:
  7. * @author: javabb (javabob(a)163.com)
  8. * @create: 2021/06/22 20:36
  9. */
  10. public class P2sSubscription {
  11. //连接信息
  12. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  13. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  14. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  15. public static void main(String[] args) throws JMSException {
  16. // 1、创建一个ConnectionFactory对象连接MQ服务器
  17. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
  18. PASSWORD,BROKER_URL);
  19. // 2、使用工厂对象创建一个Connection对象。
  20. Connection connection = connectionFactory.createConnection();
  21. // 3、开启连接,调用Connection对象的start方法。
  22. connection.start();
  23. // 4、创建一个Session对象。
  24. // 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
  25. // 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
  26. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  27. // 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
  28. Topic topic = session.createTopic("test-topic");
  29. // 6、使用Session对象创建一个Producer对象。
  30. MessageProducer producer = session.createProducer(topic);
  31. // 7、创建一个Message对象,可以使用TextMessage。
  32. for (int i = 0; i < 50; i++) {
  33. TextMessage textMessage = session.createTextMessage("第" + i + "一个ActiveMQ队列目的地的消息");
  34. // 8、发送消息
  35. producer.send(textMessage);
  36. }
  37. // 9、关闭资源
  38. producer.close();
  39. session.close();
  40. connection.close();
  41. }
  42. }

发布订阅模型,订阅者要提前订阅,所以先运行订阅者。

image.png

两种模式对比

1)由以上,我们可以总结出ActiveMQ的实现步骤:

  • 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址
  • 通过ConnectionFactory对象创建一个Connection连接
  • 通过Connection对象创建Session会话
  • 通过Session对象创建Destination对象;在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
  • 通过Session对象创建消息的发送和接收对象
  • 发送消息
  • 关闭资源

2)可以看出,P2P模式和Pub/Sub模式,在实现上的区别是通过Session创建的Destination对象不一样,在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)

springboot集成

通过使用Spring的JMSTemplate来操作消息消费。来看下实际应用中怎么用的mq。

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.apache.activemq</groupId>
  7. <artifactId>activemq-client</artifactId>
  8. <version>5.16.0</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.activemq</groupId>
  12. <artifactId>activemq-pool</artifactId>
  13. <version>5.16.0</version>
  14. </dependency>

配置ActiveMQConfig

  1. package cn.javabb.mq.activemq.boot.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.activemq.ActiveMQConnection;
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import org.apache.activemq.RedeliveryPolicy;
  6. import org.apache.activemq.command.ActiveMQQueue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  10. import org.springframework.jms.core.JmsTemplate;
  11. /**
  12. * @desc:
  13. * @author: javabb (javabob(a)163.com)
  14. * @create: 2021/06/22 21:16
  15. */
  16. @Configuration
  17. @Slf4j
  18. public class ActiveMqConfig {
  19. //连接信息 可以自定义配置
  20. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  21. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  22. private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
  23. @Bean
  24. public ActiveMQQueue queueRequest(){
  25. return new ActiveMQQueue("QUENE_REQUEST");
  26. }
  27. /**
  28. * @Description: 消息重发机制
  29. */
  30. @Bean
  31. public RedeliveryPolicy redeliveryPolicy(){
  32. RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
  33. redeliveryPolicy.setUseExponentialBackOff(true);
  34. redeliveryPolicy.setMaximumRedeliveries(10); //重发次数,默认为6次
  35. //重发时间间隔,默认为1秒
  36. redeliveryPolicy.setInitialRedeliveryDelay(1);
  37. //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
  38. redeliveryPolicy.setBackOffMultiplier(5);
  39. //是否避免消息碰撞
  40. redeliveryPolicy.setUseCollisionAvoidance(false);
  41. //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
  42. redeliveryPolicy.setMaximumRedeliveryDelay(-1);
  43. return redeliveryPolicy;
  44. }
  45. @Bean
  46. public ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){
  47. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);
  48. activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
  49. activeMQConnectionFactory.setTrustAllPackages(true);
  50. return activeMQConnectionFactory;
  51. }
  52. @Bean
  53. public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){
  54. JmsTemplate jmsTemplate=new JmsTemplate();
  55. jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
  56. jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
  57. jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
  58. return jmsTemplate;
  59. }
  60. //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
  61. @Bean(name = "jmsQueueListener")
  62. public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
  63. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  64. factory.setConnectionFactory(activeMQConnectionFactory);
  65. //设置连接数
  66. factory.setConcurrency("1-3");
  67. //重连间隔时间
  68. factory.setRecoveryInterval(1000L);
  69. factory.setSessionAcknowledgeMode(4);
  70. return factory;
  71. }
  72. }

消息生产者

  1. @Component
  2. public class RequestQueueSender {
  3. @Autowired
  4. private JmsTemplate jmsTemplate;
  5. public void sendMessage(Destination destination, final Serializable msg) {
  6. jmsTemplate.send(destination, new MessageCreator() {
  7. @Override
  8. public Message createMessage(Session session) throws JMSException {
  9. return session.createObjectMessage(msg);
  10. }
  11. });
  12. }
  13. }

消息消费者

  1. @Component
  2. @Slf4j
  3. public class RequestConsumer {
  4. //这里先不用线程池
  5. @JmsListener(destination= "QUENE_REQUEST", containerFactory = "jmsQueueListener")
  6. public void receiveRequest(ObjectMessage objectMessage, Session session) throws JMSException {
  7. try {
  8. Serializable object = objectMessage.getObject();
  9. // log.info("=====================");
  10. // log.info(object.toString());
  11. processResult(object);
  12. // log.info("=====================");
  13. objectMessage.acknowledge();
  14. } catch (Exception e) {
  15. log.error("消费消息失败",e.getMessage());
  16. session.recover();
  17. }
  18. }
  19. /**
  20. * 消息处理
  21. * @param obj
  22. */
  23. private void processResult(Object obj) {
  24. System.out.println("结果处理:"+Convert.toStr(obj));
  25. }
  26. }

启动类

  1. @SpringBootApplication
  2. public class ActiveMqApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(ActiveMqApplication.class, args);
  5. }
  6. }

测试消息过来发送到消息队列

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class TestRequest {
  4. @Autowired
  5. Destination queueRequest;
  6. @Autowired
  7. RequestQueueSender queueSender;
  8. /**
  9. * 发送消息
  10. */
  11. @Test
  12. public void producer() {
  13. for(int i=0 ;i<50;i++){
  14. queueSender.sendMessage(queueRequest,"发送消息i="+i);
  15. }
  16. }
  17. }

先启动应用,再在测试类中通过向消息队列中发送消息,可以看到返回的消息处理消息。
image.png