下载部署
下载地址:
[http://activemq.apache.org/components/classic/download/](http://activemq.apache.org/components/classic/download/)
启动
点击install/bin/win64/activemq.bat
启动,如下图启动成功。
监控ActiveMQ的admin应用地址: [http://127.0.0.1:8161/admin/](http://127.0.0.1:8161/admin/)
默认账号密码:admin/admin
界面
主页面
消息队列界面
Name:消息队列的名称。
Number Of Pending Messages:未被消费的消息数目。
Number Of Consumers:消费者的数量。
Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
使用
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.0</version>
</dependency>
点对点(P2P)模型
点对点模型,采用的是队列(Queue)作为消息载体。在该模式中,一条消息只能被一个消费者消费,没有被消费的,只能留在队列中,等待被消费,或者超时。举个例子,如果队列中有10条消息,有两个消费者,就是一个消费者消费5条信息,你一条我一条。以下以代码演示。
点对点-单线程
消息生产者P2pProducter
public class P2pProducter {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD,BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i=0;i<=5;i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是第"+i+"消息");
producer.send(textMessage);
}
if(connection!=null){
connection.close();
}
}
}
消息消费者
public class P2pConsumer {
//连接信息
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD,BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(destination);
while (true){
TextMessage message = (TextMessage) consumer.receive();
if (message==null){
break;
}
System.out.println(message.getText());
}
if(connection!=null){
connection.close();
}
}
}
点对点消息消费步骤
实现步骤
1.建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
2.通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
3.通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
4.通过Session对象创建Destination对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
5.通过Session对象创建消息的发送和接收对象(生产者和消费者)
6.通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
7.使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
点对点-多线程
在这里通过多线程生产和消费
生产者Producter
package cn.javabb.mq.activemq.p2pMult;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 20:22
*/
@Slf4j
public class Producter {
//连接信息
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
//提供原子操作的Integer 提供安全操作+- 适合并发系统
AtomicInteger count = new AtomicInteger(0);
// 链接工厂
ConnectionFactory connectionFactory;
// 链接对象
Connection connection;
// 事务管理
Session session;
// 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init() {
try {
System.out.println(USERNAME + "," + PASSWORD + "," + BROKER_URL);
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 从工厂中创建一个链接
connection = connectionFactory.createConnection();
connection.start();
// 创建一个事务
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMessage(String disname) {
try {
Queue queue = session.createQueue(disname);
MessageProducer messageProducer = null;
if (threadLocal.get() != null) {
System.out.println("当前threadLocal不为空");
messageProducer = threadLocal.get();
} else {
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while (true) {
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);
System.out.println(Thread.currentThread().getName() + "producter:我正在生产东西!,count:" + num);
messageProducer.send(msg);
session.commit();
}
} catch (JMSException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
消费者
package cn.javabb.mq.queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2020/09/15 18:31
*/
public class Comsumer {
//连接信息
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
//提供原子操作的Integer 提供安全操作+- 适合并发系统
AtomicInteger count = new AtomicInteger(0);
// 链接工厂
ConnectionFactory connectionFactory;
// 链接对象
Connection connection;
// 事务管理
Session session;
// 创建线程局部变量 只能被当前线程访问,其他线程无法访问和修改
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
public void init() {
try {
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 从工厂中创建一个链接
connection = connectionFactory.createConnection();
connection.start();
// 创建一个事务
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void getMessage(String disname) {
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if (threadLocal.get() != null) {
consumer = threadLocal.get();
} else {
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while (true) {
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if (msg != null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName() + ": Consumer:我是消费者,我正在消费Msg:" + msg.getText() + "--->" + count.getAndIncrement());
} else {
break;
}
}
} catch (JMSException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
启动生产者生产消息
package cn.javabb.mq.activemq.p2pMult;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 20:24
*/
public class TestMultProducter {
public static void main(String[] args) {
Producter producter = new Producter();
producter.init();
TestMultProducter testProductor = new TestMultProducter();
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//new Thread()
new Thread(testProductor.new ProductorMq(producter)).start();
new Thread(testProductor.new ProductorMq(producter)).start();
new Thread(testProductor.new ProductorMq(producter)).start();
}
class ProductorMq implements Runnable {
Producter producter;
public ProductorMq(Producter producter) {
this.producter = producter;
}
@Override
public void run() {
while (true) {
try {
producter.sendMessage("javabb-demo-activemq");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消费者消费
package cn.javabb.mq.activemq.p2pMult;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 20:25
*/
public class TestMultConsumer {
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.init();
TestMultConsumer testConsumer = new TestMultConsumer();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
new Thread(testConsumer.new ConsumerMq(consumer)).start();
}
class ConsumerMq implements Runnable {
Consumer consumer;
public ConsumerMq(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while (true) {
try {
consumer.getMessage("javabb-demo-activemq");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
同时启动生产者消费者,每个生产者和消费都同时启动了3个进程,同时生产和消费。
发布订阅(P2S)模型
发布/订阅模型采用的是主题(Topic)作为消息通讯载体。该模式类似微信公众号的模式。发布者发布一条信息,然后将该信息传递给所有的订阅者。注意:订阅者想要接收到该信息,必须在该信息发布之前订阅。
发布者P2sPublish
package cn.javabb.mq.activemq.p2s;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 20:35
*/
public class P2sPublish {
//连接信息
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException, IOException {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD,BROKER_URL);
// 创建一个连接对象
Connection connection;
connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destination对象。topic对象
Topic topic = session.createTopic("test-topic");
// 使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("这是接收到的消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消费者启动。。。。");
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
}
订阅者P2sSubscription
package cn.javabb.mq.activemq.p2s;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 20:36
*/
public class P2sSubscription {
//连接信息
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
// 1、创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD,BROKER_URL);
// 2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3、开启连接,调用Connection对象的start方法。
connection.start();
// 4、创建一个Session对象。
// 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
// 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
Topic topic = session.createTopic("test-topic");
// 6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
// 7、创建一个Message对象,可以使用TextMessage。
for (int i = 0; i < 50; i++) {
TextMessage textMessage = session.createTextMessage("第" + i + "一个ActiveMQ队列目的地的消息");
// 8、发送消息
producer.send(textMessage);
}
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
}
发布订阅模型,订阅者要提前订阅,所以先运行订阅者。
两种模式对比
1)由以上,我们可以总结出ActiveMQ的实现步骤:
- 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址
- 通过ConnectionFactory对象创建一个Connection连接
- 通过Connection对象创建Session会话
- 通过Session对象创建Destination对象;在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
- 通过Session对象创建消息的发送和接收对象
- 发送消息
- 关闭资源
2)可以看出,P2P模式和Pub/Sub模式,在实现上的区别是通过Session创建的Destination对象不一样,在P2P的模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
springboot集成
通过使用Spring的JMSTemplate来操作消息消费。来看下实际应用中怎么用的mq。
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.0</version>
</dependency>
配置ActiveMQConfig
package cn.javabb.mq.activemq.boot.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
/**
* @desc:
* @author: javabb (javabob(a)163.com)
* @create: 2021/06/22 21:16
*/
@Configuration
@Slf4j
public class ActiveMqConfig {
//连接信息 可以自定义配置
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
@Bean
public ActiveMQQueue queueRequest(){
return new ActiveMQQueue("QUENE_REQUEST");
}
/**
* @Description: 消息重发机制
*/
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(10); //重发次数,默认为6次
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(5);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){
JmsTemplate jmsTemplate=new JmsTemplate();
jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
return jmsTemplate;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置连接数
factory.setConcurrency("1-3");
//重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
消息生产者
@Component
public class RequestQueueSender {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination, final Serializable msg) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(msg);
}
});
}
}
消息消费者
@Component
@Slf4j
public class RequestConsumer {
//这里先不用线程池
@JmsListener(destination= "QUENE_REQUEST", containerFactory = "jmsQueueListener")
public void receiveRequest(ObjectMessage objectMessage, Session session) throws JMSException {
try {
Serializable object = objectMessage.getObject();
// log.info("=====================");
// log.info(object.toString());
processResult(object);
// log.info("=====================");
objectMessage.acknowledge();
} catch (Exception e) {
log.error("消费消息失败",e.getMessage());
session.recover();
}
}
/**
* 消息处理
* @param obj
*/
private void processResult(Object obj) {
System.out.println("结果处理:"+Convert.toStr(obj));
}
}
启动类
@SpringBootApplication
public class ActiveMqApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMqApplication.class, args);
}
}
测试消息过来发送到消息队列
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRequest {
@Autowired
Destination queueRequest;
@Autowired
RequestQueueSender queueSender;
/**
* 发送消息
*/
@Test
public void producer() {
for(int i=0 ;i<50;i++){
queueSender.sendMessage(queueRequest,"发送消息i="+i);
}
}
}
先启动应用,再在测试类中通过向消息队列中发送消息,可以看到返回的消息处理消息。