下载部署
下载地址:
[http://activemq.apache.org/components/classic/download/](http://activemq.apache.org/components/classic/download/)
启动
点击install/bin/win64/activemq.bat启动,如下图启动成功。
监控ActiveMQ的admin应用地址: [http://127.0.0.1:8161/admin/](http://127.0.0.1:8161/admin/)默认账号密码:admin/admin
界面
主页面
消息队列界面
Name:消息队列的名称。
Number Of Pending Messages:未被消费的消息数目。
Number Of Consumers:消费者的数量。
Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
使用
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.16.0</version></dependency>
点对点(P2P)模型
点对点模型,采用的是队列(Queue)作为消息载体。在该模式中,一条消息只能被一个消费者消费,没有被消费的,只能留在队列中,等待被消费,或者超时。举个例子,如果队列中有10条消息,有两个消费者,就是一个消费者消费5条信息,你一条我一条。以下以代码演示。
点对点-单线程
消息生产者P2pProducter
public class P2pProducter {private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("queue");MessageProducer producer = session.createProducer(destination);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);for (int i=0;i<=5;i++) {TextMessage textMessage = session.createTextMessage();textMessage.setText("我是第"+i+"消息");producer.send(textMessage);}if(connection!=null){connection.close();}}}
消息消费者
public class P2pConsumer {//连接信息private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("queue");MessageConsumer consumer = session.createConsumer(destination);while (true){TextMessage message = (TextMessage) consumer.receive();if (message==null){break;}System.out.println(message.getText());}if(connection!=null){connection.close();}}}
点对点消息消费步骤
实现步骤
1.建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
2.通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
3.通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
4.通过Session对象创建Destination对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
5.通过Session对象创建消息的发送和接收对象(生产者和消费者)
6.通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
7.使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
点对点-多线程
在这里通过多线程生产和消费
生产者Producter
package cn.javabb.mq.activemq.p2pMult;import lombok.extern.slf4j.Slf4j;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 20:22*/@Slf4jpublic class Producter {//连接信息private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;//提供原子操作的Integer 提供安全操作+- 适合并发系统AtomicInteger count = new AtomicInteger(0);// 链接工厂ConnectionFactory connectionFactory;// 链接对象Connection connection;// 事务管理Session session;// 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();public void init() {try {System.out.println(USERNAME + "," + PASSWORD + "," + BROKER_URL);// 创建链接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);// 从工厂中创建一个链接connection = connectionFactory.createConnection();connection.start();// 创建一个事务session = connection.createSession(true, Session.SESSION_TRANSACTED);} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public void sendMessage(String disname) {try {Queue queue = session.createQueue(disname);MessageProducer messageProducer = null;if (threadLocal.get() != null) {System.out.println("当前threadLocal不为空");messageProducer = threadLocal.get();} else {messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while (true) {Thread.sleep(1000);int num = count.getAndIncrement();//创建一条消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);System.out.println(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);messageProducer.send(msg);session.commit();}} catch (JMSException | InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
消费者
package cn.javabb.mq.queue;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2020/09/15 18:31*/public class Comsumer {//连接信息private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;//提供原子操作的Integer 提供安全操作+- 适合并发系统AtomicInteger count = new AtomicInteger(0);// 链接工厂ConnectionFactory connectionFactory;// 链接对象Connection connection;// 事务管理Session session;// 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();public void init() {try {// 创建链接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);// 从工厂中创建一个链接connection = connectionFactory.createConnection();connection.start();// 创建一个事务session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public void getMessage(String disname) {try {Queue queue = session.createQueue(disname);MessageConsumer consumer = null;if (threadLocal.get() != null) {consumer = threadLocal.get();} else {consumer = session.createConsumer(queue);threadLocal.set(consumer);}while (true) {Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if (msg != null) {msg.acknowledge();System.out.println(Thread.currentThread().getName() + ": Consumer:我是消费者,我正在消费Msg:" + msg.getText() + "--->" + count.getAndIncrement());} else {break;}}} catch (JMSException | InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
启动生产者生产消息
package cn.javabb.mq.activemq.p2pMult;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 20:24*/public class TestMultProducter {public static void main(String[] args) {Producter producter = new Producter();producter.init();TestMultProducter testProductor = new TestMultProducter();try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}//new Thread()new Thread(testProductor.new ProductorMq(producter)).start();new Thread(testProductor.new ProductorMq(producter)).start();new Thread(testProductor.new ProductorMq(producter)).start();}class ProductorMq implements Runnable {Producter producter;public ProductorMq(Producter producter) {this.producter = producter;}@Overridepublic void run() {while (true) {try {producter.sendMessage("javabb-demo-activemq");Thread.sleep(10000);} catch (Exception e) {e.printStackTrace();}}}}}
消费者消费
package cn.javabb.mq.activemq.p2pMult;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 20:25*/public class TestMultConsumer {public static void main(String[] args) {Consumer consumer = new Consumer();consumer.init();TestMultConsumer testConsumer = new TestMultConsumer();new Thread(testConsumer.new ConsumerMq(consumer)).start();new Thread(testConsumer.new ConsumerMq(consumer)).start();new Thread(testConsumer.new ConsumerMq(consumer)).start();}class ConsumerMq implements Runnable {Consumer consumer;public ConsumerMq(Consumer consumer) {this.consumer = consumer;}@Overridepublic void run() {while (true) {try {consumer.getMessage("javabb-demo-activemq");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}}}}
同时启动生产者消费者,每个生产者和消费都同时启动了3个进程,同时生产和消费。

发布订阅(P2S)模型
发布/订阅模型采用的是主题(Topic)作为消息通讯载体。该模式类似微信公众号的模式。发布者发布一条信息,然后将该信息传递给所有的订阅者。注意:订阅者想要接收到该信息,必须在该信息发布之前订阅。
发布者P2sPublish
package cn.javabb.mq.activemq.p2s;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 20:35*/public class P2sPublish {//连接信息private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) throws JMSException, IOException {// 创建一个ConnectionFactory对象连接MQ服务器ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);// 创建一个连接对象Connection connection;connection = connectionFactory.createConnection();// 开启连接connection.start();// 使用Connection对象创建一个Session对象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建一个Destination对象。topic对象Topic topic = session.createTopic("test-topic");// 使用Session对象创建一个消费者对象。MessageConsumer consumer = session.createConsumer(topic);// 接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 打印结果TextMessage textMessage = (TextMessage) message;String text;try {text = textMessage.getText();System.out.println("这是接收到的消息:" + text);} catch (JMSException e) {e.printStackTrace();}}});System.out.println("topic消费者启动。。。。");// 等待接收消息System.in.read();// 关闭资源consumer.close();session.close();connection.close();}}
订阅者P2sSubscription
package cn.javabb.mq.activemq.p2s;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 20:36*/public class P2sSubscription {//连接信息private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) throws JMSException {// 1、创建一个ConnectionFactory对象连接MQ服务器ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);// 2、使用工厂对象创建一个Connection对象。Connection connection = connectionFactory.createConnection();// 3、开启连接,调用Connection对象的start方法。connection.start();// 4、创建一个Session对象。// 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。// 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topicTopic topic = session.createTopic("test-topic");// 6、使用Session对象创建一个Producer对象。MessageProducer producer = session.createProducer(topic);// 7、创建一个Message对象,可以使用TextMessage。for (int i = 0; i < 50; i++) {TextMessage textMessage = session.createTextMessage("第" + i + "一个ActiveMQ队列目的地的消息");// 8、发送消息producer.send(textMessage);}// 9、关闭资源producer.close();session.close();connection.close();}}
发布订阅模型,订阅者要提前订阅,所以先运行订阅者。
两种模式对比
1)由以上,我们可以总结出ActiveMQ的实现步骤:
- 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址
- 通过ConnectionFactory对象创建一个Connection连接
- 通过Connection对象创建Session会话
- 通过Session对象创建Destination对象;在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
- 通过Session对象创建消息的发送和接收对象
- 发送消息
- 关闭资源
2)可以看出,P2P模式和Pub/Sub模式,在实现上的区别是通过Session创建的Destination对象不一样,在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
springboot集成
通过使用Spring的JMSTemplate来操作消息消费。来看下实际应用中怎么用的mq。
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.16.0</version></dependency>
配置ActiveMQConfig
package cn.javabb.mq.activemq.boot.config;import lombok.extern.slf4j.Slf4j;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.RedeliveryPolicy;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import org.springframework.jms.core.JmsTemplate;/*** @desc:* @author: javabb (javabob(a)163.com)* @create: 2021/06/22 21:16*/@Configuration@Slf4jpublic class ActiveMqConfig {//连接信息 可以自定义配置private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;@Beanpublic ActiveMQQueue queueRequest(){return new ActiveMQQueue("QUENE_REQUEST");}/*** @Description: 消息重发机制*/@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();redeliveryPolicy.setUseExponentialBackOff(true);redeliveryPolicy.setMaximumRedeliveries(10); //重发次数,默认为6次//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(5);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(activeMQConnectionFactory);//设置连接数factory.setConcurrency("1-3");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);return factory;}}
消息生产者
@Componentpublic class RequestQueueSender {@Autowiredprivate JmsTemplate jmsTemplate;public void sendMessage(Destination destination, final Serializable msg) {jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createObjectMessage(msg);}});}}
消息消费者
@Component@Slf4jpublic class RequestConsumer {//这里先不用线程池@JmsListener(destination= "QUENE_REQUEST", containerFactory = "jmsQueueListener")public void receiveRequest(ObjectMessage objectMessage, Session session) throws JMSException {try {Serializable object = objectMessage.getObject();// log.info("=====================");// log.info(object.toString());processResult(object);// log.info("=====================");objectMessage.acknowledge();} catch (Exception e) {log.error("消费消息失败",e.getMessage());session.recover();}}/*** 消息处理* @param obj*/private void processResult(Object obj) {System.out.println("结果处理:"+Convert.toStr(obj));}}
启动类
@SpringBootApplicationpublic class ActiveMqApplication {public static void main(String[] args) {SpringApplication.run(ActiveMqApplication.class, args);}}
测试消息过来发送到消息队列
@RunWith(SpringRunner.class)@SpringBootTestpublic class TestRequest {@AutowiredDestination queueRequest;@AutowiredRequestQueueSender queueSender;/*** 发送消息*/@Testpublic void producer() {for(int i=0 ;i<50;i++){queueSender.sendMessage(queueRequest,"发送消息i="+i);}}}
先启动应用,再在测试类中通过向消息队列中发送消息,可以看到返回的消息处理消息。
