1. 事务
开启事务:
public static void main(String[] args) throws JMSException, InterruptedException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//创建直连会话Connection connection = activeMQConnectionFactory.createConnection();connection.start();//设置为true表示开启事务//Session.AUTO_ACKNOWLEDGE 消息签收模式为自动签收Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);}
用来提交/回滚事务:
session.commit();session.rollback();
- 发送消息时时如果不commit,即使调用了send方法消息也不会发送。
 - 消费消息时时如果不commit,消息不会被确认。
 - 多次
send可以对应一次commit - 假如 Customer1 消费消息时如果没有ack,则 Broker会一直等待 Customer1 确认消息,如果这个时候 Customer2 连接进来则不会看到 Customer1 这些待确认消息。假如 Customer1 还留有待确认的消息就直接下线了,则 Customer2 会收到这些待确认的消息。
 
事务导致的消息重复投递:
假设 消费者A 没有commit消息就被迫下线,则未被commit的消息则会activeMQ再次投递给其他消费者消费。
2. 消息签收模式
消息签收代表接收端的session已收到消息的一次确认,反馈给broker
ActiveMQ支持自动签收与手动签收
Session.AUTO_ACKNOWLEDGE
| 当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。 | 
|---|
Session.CLIENT_ACKNOWLEDGE
| 客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。 | |
Session.DUPS_OK_ACKNOWLEDGE
| Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。 |
3. 消息持久化
默认持久化是开启的
queueProducer.getDeliveryMode(); //2 默认是持久化queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置为不持久化queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置为持久化
4. 消费优先级
设置消息的消费优先级可以打乱消费顺序,级别高的先消费。
4.1 修改配置
使用级别消费需要先在activemq.xml中配置开启优先级的消费模式:
在 beans -> broker -> destinationPolicy -> policyMap -> policyEntries 下新增一个配置:
<policyEntry queue="user" prioritizedMessages="true" />
其中 ”user“ 是名字
 
然后重启MQ. 
4.2 producer设置消费级别
设置整体消息的优先级
Queue userQueue = session.createQueue("user");MessageProducer queueProducer = session.createProducer(userQueue);queueProducer.setPriority(4); //设置优先级为4级
设置单个消息的优先级
//发送文本消息TextMessage textMessage = session.createTextMessage("hahah"+i);queueProducer.setPriority(4); //设置优先级为4级queueProducer.send(textMessage);//或//2 :表示消息优先级//1000L: 为超时设置queueProducer.send(textMessage,DeliveryMode.NON_PERSISTENT,2,1000L);
消息的默认优先级为4级。
5. 临时消息和非临时消息
临时消息时session级别的,它的生命周期伴随着session,一但session销毁则所有的临时消息都会销毁。
非临时消息时整个mq级别的,它的生命周期是永久。
创建临时消息
创建非临时消息
6. 消息超时与死信队列
6.1 消息超时机制
producer.setTimeToLive()
如果设置了消息超时,消费端在超时后无法在消费到此消息。流程如下:
给消息设置一个超时时间 -> 死信队列-> 拿出来 -> 重发

6.2 死信队列
进入到**ActiveMQ.DLQ**队列且不会自动清除,次队列成为死信队列(Dead Letter Message)
此处有消息堆积的风险
6.2.1 配置死信队列
<policyEntry queue="f" prioritizedMessages="true" ><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" /></deadLetterStrategy></policyEntry>
6.2.2 死信队列的配置
| 说明 | 枚举值 | 默认值 | |
|---|---|---|---|
| queuePrefix | 死信队列名称 | ||
| useQueueForQueueMessages | 是否使用queue存储死信 | true/false | true | 
| useQueueForTopicMessages | 是否使用Topic存储死信 | true/false | |
| processNonPersistent | 非持久化的消息是否进入死信 | true/false | |
| processExpired | 过期消息是否进死信队列 | true/false | 
7. 独占消费者
Producer
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Producer {public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue userQueue = session.createQueue("user");MessageProducer producer = session.createProducer(userQueue);for (int i = 0; i < 1000; i++) {TextMessage textMessage = session.createTextMessage("name=wangfan age=" + i);producer.send(textMessage);}}}
Customer
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Consumer {public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue userQueue = session.createQueue("user?consumer.exclusive=true");MessageConsumer consumer = session.createConsumer(userQueue);consumer.setMessageListener(msg->{TextMessage textMessage = (TextMessage) msg;try {Thread.sleep(1000L);System.out.println(textMessage.getText());} catch (Exception e) {e.printStackTrace();}});}}
先后启动2个消费者A和B,观察打印。
关闭消费者A后观察再消费者B。
还可以设置优先级
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");
8. 消息类型
8.1 Object类型
必须要发送的类要实现Serializable接口
Person类
import java.io.Serializable;@Data@AllArgsConstructor@ToStringpublic class Person implements Serializable {private String name;private int age;private String address;}
Producer
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** @Author:壹心科技BCF项目组 wangfan* @Date:Created in 2021/9/22 16:29* @Project:epec* @Description:TODO* @Modified By:wangfan* @Version: V1.0*/public class Producer {public static void main(String[] args) throws JMSException, InterruptedException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//设置信任包activeMQConnectionFactory.setTrustAllPackages(true);//创建直连会话Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/*** 发送队列消息*///创建一个队列Queue personQueue = session.createQueue("person");MessageProducer queueProducer = session.createProducer(personQueue);for (int i = 0; i<100; i++){Person person = new Person("王帆" + i+"号", i, "河北省");ObjectMessage objectMessage = session.createObjectMessage(person);queueProducer.send(objectMessage);}//关闭链接session.close();}}
Consumer
if(message instanceof ActiveMQObjectMessage) {ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)message;Person person = (Person)activeMQObjectMessage.getObject();System.out.println(person);}
如果遇到此类报错
Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)at com.mashibing.mq.Receiver.main(Receiver.java:65)Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)... 1 more
需要添加信任
activeMQConnectionFactory.setTrustedPackages(new ArrayList<String>(Arrays.asList(Person.class.getPackage().getName() //信任Person类的package)));//或者 信任所有的packageactiveMQConnectionFactory.setTrustAllPackages(true);
8.2 BytesMessage
Producer
BytesMessage bytesMessage = session.createBytesMessage();bytesMessage.writeBytes("str".getBytes());bytesMessage.writeUTF("哈哈");
Customer
if(message instanceof BytesMessage) {BytesMessage bm = (BytesMessage)message;byte[] b = new byte[1024];int len = -1;while ((len = bm.readBytes(b)) != -1) {System.out.println(new String(b, 0, len));}}
还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean()bm.readUTF()
写入文件
FileOutputStream out = null;try {out = new FileOutputStream("d:/aa.txt");} catch (FileNotFoundException e2) {e2.printStackTrace();}byte[] by = new byte[1024];int len = 0 ;try {while((len = bm.readBytes(by))!= -1){out.write(by,0,len);}} catch (Exception e1) {e1.printStackTrace();}
8.3 MapMessage
Producer
MapMessage mapMessage = session.createMapMessage();mapMessage.setString("name","lucy");mapMessage.setBoolean("yihun",false);mapMessage.setInt("age", 17);producer.send(mapMessage);
Customer
Message message = consumer.receive();MapMessage mes = (MapMessage) message;System.out.println(mes);System.out.println(mes.getString("name"));
9. 消息监听器
public class Consumer {public static void main(String[] args) throws JMSException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//创建直连会话Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);Queue userQueue = session.createQueue("user");MessageConsumer userConsumer = session.createConsumer(userQueue);//使用消息监听器接收消息userConsumer.setMessageListener(e->{TextMessage textMessage = (TextMessage)e;try {System.out.println(textMessage.getText());} catch (JMSException ex) {ex.printStackTrace();}});}}
10. 消息发送
10.1 同步与异步
| 开启事务 | 关闭事务 | |
|---|---|---|
| 持久化 | 异步 | 同步 | 
| 非持久化 | 异步 | 异步 | 
我们可以通过以下几种方式来设置异步发送:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");// 2.获取一个向ActiveMQ的连接connectionFactory.setUseAsyncSend(true);ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();//设置为异步发送connection.setUseAsyncSend(true);
10.2消息堆积
producer每发送一个消息,统计一下发送的字节数,当字节数达到producerWindowSize值时,需要等待broker的确认,才能继续发送。
brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
或者
destinationUri中设置: xxxx?producer.windowSize=1048576
10.3 延迟消息投递
首先在配置文件中开启延迟和调度
schedulerSupport=”true”
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延迟发送
TextMessage textMessage = session.createTextMessage("this is message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);
10.4 按照指定频率重复发送
TextMessage textMessage = session.createTextMessage("this is message");//设置延迟10秒发送message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10 * 1000);//设置重复次数message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);//设置每次重复消息的间隔时间message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 2 * 1000);producer.send(message);
10.5 Cron表达式定时发送
TextMessage textMessage = session.createTextMessage("this is message body");//每隔5分鍾發送一次textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 0/5 * * * ? ");
11. 使用selector过滤消息
消息发送
MapMessage msg1 = session.createMapMessage();msg1.setString("name", "qiqi");msg1.setStringProperty("name", "qiqi");msg1.setString("age", "18");msg1.setIntProperty("age", 18);MapMessage msg2 = session.createMapMessage();msg2.setString("name", "lucy");msg2.setString("age", "18");msg2.setStringProperty("name", "lucy");msg2.setIntProperty("age", 18);MapMessage msg3 = session.createMapMessage();msg3.setString("name", "qianqian");msg3.setString("age", "17");msg3.setStringProperty("name", "qianqian");msg3.setIntProperty("age", 17);
消息过滤接收
String selector1 = "age > 17 and name = 'lucy'";MessageConsumer consumer = session.createConsumer(queue,selector1);
12. 整合SpringBoot
pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.messaginghub</groupId><artifactId>pooled-jms</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>
application.properties
spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=adminspring.jms.pub-sub-domain=truespring.activemq.pool.enabled=truespring.activemq.pool.max-connections=50
TestActiveMQController
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.mashibing.activemq03.service.SenderService;@RestControllerpublic class MainController {@AutowiredSenderService senderSrv;@RequestMapping("send")public String send() {senderSrv.send("springboot","hello~!");return "ok";}}
ActiveMQConfig
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;@Configuration@EnableJmspublic class ActiveMQConfig {// topic模式的ListenerContainer@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setPubSubDomain(true);bean.setConnectionFactory(activeMQConnectionFactory);return bean;}// queue模式的ListenerContainer@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setConnectionFactory(activeMQConnectionFactory);return bean;}}
MyJmsListener
import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Service;@Servicepublic class MyJmsListener {@JmsListener(destination = "spring boot queue",containerFactory = "jmsListenerContainerQueue")public void rece(String msg) {System.out.println("收到消息:" + msg);}}
13. ReplyTo机制

producer:
import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQQueue;import javax.jms.*;public class producer {public static void main(String[] args) throws JMSException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//获取链接并启动Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建消息队列"userInfo"Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);Queue userInfoQueue = session.createQueue("userInfo");//创建回复队列ActiveMQQueue userInfoReplyQueue = new ActiveMQQueue("userInfoReply");//创建消息TextMessage textMessage = session.createTextMessage("name=wangfan");textMessage.setJMSReplyTo(userInfoReplyQueue);//闯将消息生产这并将将消息队列中MessageProducer producer = session.createProducer(userInfoQueue);producer.send(textMessage);//创建消息消费者并监听回复队列中的消息session.createConsumer(userInfoReplyQueue).setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage textmsg = (TextMessage) message;try {System.out.println(textmsg.getText());} catch (JMSException e) {e.printStackTrace();}}});}}
�Consumer:
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Consumer {public static void main(String[] args) throws JMSException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//创建连接Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建队列"userInfo"Queue userInfoQueue = session.createQueue("userInfo");//创建"userInfo"消息队列的消费者并设置监听器监听队列内的数据MessageConsumer userInfoConsumer = session.createConsumer(userInfoQueue);userInfoConsumer.setMessageListener(e->{try {//打印消息TextMessage textMessage = (TextMessage) e;System.out.println(textMessage.getText());//获取消息内的replyTo并向队列中写出已消费的响应消息Destination jmsReplyTo = textMessage.getJMSReplyTo();MessageProducer producer = session.createProducer(jmsReplyTo);producer.send(session.createTextMessage(textMessage.getText()+"已消费"));} catch (JMSException ex) {ex.printStackTrace();}});}}
13. Browser 获取即时队列的内容。
可以查看队列中的消息而不消费,没有订阅的功能
import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQQueue;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.QueueBrowser;import javax.jms.Session;import java.util.Enumeration;public class Browser {public static void main(String[] args) throws JMSException {//初始化链接ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//创建直连会话Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建sessionSession session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建browser, 并获取指定队列内的数据, 然后打印QueueBrowser personBrowser = session.createBrowser(new ActiveMQQueue("person"));Enumeration enumeration = personBrowser.getEnumeration();while (enumeration.hasMoreElements()){System.out.println(enumeration.nextElement().toString());}}}
14. JMSCorrelationID
用于消息之间的关联,给人一种会话的感觉
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
15. QueueRequestor同步消息
可以发送同步消息
本质违背了mq的异步通讯原则
但是mq还是能够提供应用解耦、异构系统的特性
因为使用new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.optimizeAcknowledge=false”); 
发送消息后,会等待接收端的回复,如果收不到回复就会造成死等现象!而且该方法没有设置超时等待的功能
16. 批量确认
ActiveMQ缺省支持批量确认消息,批量确认可以提高系统性能
关闭方法:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
17. 消费缓冲与消息积压prefetchSize
消费者端,一般来说消费的越快越好,broker的积压越小越好。
但是考虑到事务性和客户端确认的情况,如果一个消费者一次获取到了很多消息却都不确认,这会造成事务上下文变大,broker端这种“半消费状态”的数据变多,所以ActiveMQ有一个prefetchSize参数来控制未确认情况下,最多可以预获取多少条记录。
Pre-fetch默认值
| consumer type | default value | 
|---|---|
| queue | 1000 | 
| queue browser | 500 | 
| topic | 32766 | 
| durable topic | 1000 | 
可以通过3中方式设置prefetchSize
创建连接时整体设置
ActiveMQConnectionFactory connectio nFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:5671?jms.prefetchPolicy.all=50");
创建连接时对topic和queue单独设置
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:5671?jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1");
针对destination单独设置
Destination topic = session.createTopic("user?consumer.prefetchSize=10");
注意:对destination设置prefetchsize后会覆盖连接时的设置值
