1. 事务
开启事务:
public static void main(String[] args) throws JMSException, InterruptedException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//创建直连会话
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//设置为true表示开启事务
//Session.AUTO_ACKNOWLEDGE 消息签收模式为自动签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
}
用来提交/回滚事务:
session.commit();
session.rollback();
- 发送消息时时如果不commit,即使调用了send方法消息也不会发送。
- 消费消息时时如果不commit,消息不会被确认。
- 多次
send
可以对应一次commit
- 假如 Customer1 消费消息时如果没有ack,则 Broker会一直等待 Customer1 确认消息,如果这个时候 Customer2 连接进来则不会看到 Customer1 这些待确认消息。假如 Customer1 还留有待确认的消息就直接下线了,则 Customer2 会收到这些待确认的消息。
事务导致的消息重复投递:
假设 消费者A 没有commit消息就被迫下线,则未被commit的消息则会activeMQ再次投递给其他消费者消费。
2. 消息签收模式
消息签收
代表接收端的session已收到消息的一次确认,反馈给broker
ActiveMQ支持自动签收与手动签收
Session.AUTO_ACKNOWLEDGE
当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。 |
---|
Session.CLIENT_ACKNOWLEDGE
| 客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。 | |
Session.DUPS_OK_ACKNOWLEDGE
| Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。 |
3. 消息持久化
默认持久化是开启的
queueProducer.getDeliveryMode(); //2 默认是持久化
queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置为不持久化
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置为持久化
4. 消费优先级
设置消息的消费优先级可以打乱消费顺序,级别高的先消费。
4.1 修改配置
使用级别消费需要先在activemq.xml中配置开启优先级的消费模式:
在 beans
-> broker
-> destinationPolicy
-> policyMap
-> policyEntries
下新增一个配置:
<policyEntry queue="user" prioritizedMessages="true" />
其中 ”user“ 是名字
然后重启MQ.
4.2 producer设置消费级别
设置整体消息的优先级
Queue userQueue = session.createQueue("user");
MessageProducer queueProducer = session.createProducer(userQueue);
queueProducer.setPriority(4); //设置优先级为4级
设置单个消息的优先级
//发送文本消息
TextMessage textMessage = session.createTextMessage("hahah"+i);
queueProducer.setPriority(4); //设置优先级为4级
queueProducer.send(textMessage);
//或
//2 :表示消息优先级
//1000L: 为超时设置
queueProducer.send(textMessage,DeliveryMode.NON_PERSISTENT,2,1000L);
消息的默认优先级为4级。
5. 临时消息和非临时消息
临时消息时session级别的,它的生命周期伴随着session,一但session销毁则所有的临时消息都会销毁。
非临时消息时整个mq级别的,它的生命周期是永久。
创建临时消息
创建非临时消息
6. 消息超时与死信队列
6.1 消息超时机制
producer.setTimeToLive()
如果设置了消息超时,消费端在超时后无法在消费到此消息。流程如下:
给消息设置一个超时时间
-> 死信队列
-> 拿出来
-> 重发
6.2 死信队列
进入到**ActiveMQ.DLQ**
队列且不会自动清除,次队列成为死信队列(Dead Letter Message
)
此处有消息堆积的风险
6.2.1 配置死信队列
<policyEntry queue="f" prioritizedMessages="true" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
6.2.2 死信队列的配置
说明 | 枚举值 | 默认值 | |
---|---|---|---|
queuePrefix | 死信队列名称 | ||
useQueueForQueueMessages | 是否使用queue存储死信 | true/false | true |
useQueueForTopicMessages | 是否使用Topic存储死信 | true/false | |
processNonPersistent | 非持久化的消息是否进入死信 | true/false | |
processExpired | 过期消息是否进死信队列 | true/false |
7. 独占消费者
Producer
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue userQueue = session.createQueue("user");
MessageProducer producer = session.createProducer(userQueue);
for (int i = 0; i < 1000; i++) {
TextMessage textMessage = session.createTextMessage("name=wangfan age=" + i);
producer.send(textMessage);
}
}
}
Customer
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue userQueue = session.createQueue("user?consumer.exclusive=true");
MessageConsumer consumer = session.createConsumer(userQueue);
consumer.setMessageListener(msg->{
TextMessage textMessage = (TextMessage) msg;
try {
Thread.sleep(1000L);
System.out.println(textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
先后启动2个消费者A和B,观察打印。
关闭消费者A后观察再消费者B。
还可以设置优先级
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");
8. 消息类型
8.1 Object类型
必须要发送的类要实现Serializable接口
Person类
import java.io.Serializable;
@Data
@AllArgsConstructor
@ToString
public class Person implements Serializable {
private String name;
private int age;
private String address;
}
Producer
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author:壹心科技BCF项目组 wangfan
* @Date:Created in 2021/9/22 16:29
* @Project:epec
* @Description:TODO
* @Modified By:wangfan
* @Version: V1.0
*/
public class Producer {
public static void main(String[] args) throws JMSException, InterruptedException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//设置信任包
activeMQConnectionFactory.setTrustAllPackages(true);
//创建直连会话
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 发送队列消息
*/
//创建一个队列
Queue personQueue = session.createQueue("person");
MessageProducer queueProducer = session.createProducer(personQueue);
for (int i = 0; i<100; i++){
Person person = new Person("王帆" + i+"号", i, "河北省");
ObjectMessage objectMessage = session.createObjectMessage(person);
queueProducer.send(objectMessage);
}
//关闭链接
session.close();
}
}
Consumer
if(message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)message;
Person person = (Person)activeMQObjectMessage.getObject();
System.out.println(person);
}
如果遇到此类报错
Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
... 1 more
需要添加信任
activeMQConnectionFactory.setTrustedPackages(
new ArrayList<String>(Arrays.asList(
Person.class.getPackage().getName() //信任Person类的package
))
);
//或者 信任所有的package
activeMQConnectionFactory.setTrustAllPackages(true);
8.2 BytesMessage
Producer
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("str".getBytes());
bytesMessage.writeUTF("哈哈");
Customer
if(message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)message;
byte[] b = new byte[1024];
int len = -1;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean()
bm.readUTF()
写入文件
FileOutputStream out = null;
try {
out = new FileOutputStream("d:/aa.txt");
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
byte[] by = new byte[1024];
int len = 0 ;
try {
while((len = bm.readBytes(by))!= -1){
out.write(by,0,len);
}
} catch (Exception e1) {
e1.printStackTrace();
}
8.3 MapMessage
Producer
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","lucy");
mapMessage.setBoolean("yihun",false);
mapMessage.setInt("age", 17);
producer.send(mapMessage);
Customer
Message message = consumer.receive();
MapMessage mes = (MapMessage) message;
System.out.println(mes);
System.out.println(mes.getString("name"));
9. 消息监听器
public class Consumer {
public static void main(String[] args) throws JMSException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//创建直连会话
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue userQueue = session.createQueue("user");
MessageConsumer userConsumer = session.createConsumer(userQueue);
//使用消息监听器接收消息
userConsumer.setMessageListener(e->{
TextMessage textMessage = (TextMessage)e;
try {
System.out.println(textMessage.getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
});
}
}
10. 消息发送
10.1 同步与异步
开启事务 | 关闭事务 | |
---|---|---|
持久化 | 异步 | 同步 |
非持久化 | 异步 | 异步 |
我们可以通过以下几种方式来设置异步发送:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616"
);
// 2.获取一个向ActiveMQ的连接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
//设置为异步发送
connection.setUseAsyncSend(true);
10.2消息堆积
producer每发送一个消息,统计一下发送的字节数,当字节数达到producerWindowSize值时,需要等待broker的确认,才能继续发送。
brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
或者
destinationUri中设置: xxxx?producer.windowSize=1048576
10.3 延迟消息投递
首先在配置文件中开启延迟和调度
schedulerSupport=”true”
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延迟发送
TextMessage textMessage = session.createTextMessage("this is message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);
10.4 按照指定频率重复发送
TextMessage textMessage = session.createTextMessage("this is message");
//设置延迟10秒发送
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10 * 1000);
//设置重复次数
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
//设置每次重复消息的间隔时间
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 2 * 1000);
producer.send(message);
10.5 Cron表达式定时发送
TextMessage textMessage = session.createTextMessage("this is message body");
//每隔5分鍾發送一次
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 0/5 * * * ? ");
11. 使用selector过滤消息
消息发送
MapMessage msg1 = session.createMapMessage();
msg1.setString("name", "qiqi");
msg1.setStringProperty("name", "qiqi");
msg1.setString("age", "18");
msg1.setIntProperty("age", 18);
MapMessage msg2 = session.createMapMessage();
msg2.setString("name", "lucy");
msg2.setString("age", "18");
msg2.setStringProperty("name", "lucy");
msg2.setIntProperty("age", 18);
MapMessage msg3 = session.createMapMessage();
msg3.setString("name", "qianqian");
msg3.setString("age", "17");
msg3.setStringProperty("name", "qianqian");
msg3.setIntProperty("age", 17);
消息过滤接收
String selector1 = "age > 17 and name = 'lucy'";
MessageConsumer consumer = session.createConsumer(queue,selector1);
12. 整合SpringBoot
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
application.properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.pub-sub-domain=true
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
TestActiveMQController
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.mashibing.activemq03.service.SenderService;
@RestController
public class MainController {
@Autowired
SenderService senderSrv;
@RequestMapping("send")
public String send() {
senderSrv.send("springboot","hello~!");
return "ok";
}
}
ActiveMQConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class ActiveMQConfig {
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
MyJmsListener
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class MyJmsListener {
@JmsListener(destination = "spring boot queue",containerFactory = "jmsListenerContainerQueue")
public void rece(String msg) {
System.out.println("收到消息:" + msg);
}
}
13. ReplyTo机制
producer:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
public class producer {
public static void main(String[] args) throws JMSException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//获取链接并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建消息队列"userInfo"
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue userInfoQueue = session.createQueue("userInfo");
//创建回复队列
ActiveMQQueue userInfoReplyQueue = new ActiveMQQueue("userInfoReply");
//创建消息
TextMessage textMessage = session.createTextMessage("name=wangfan");
textMessage.setJMSReplyTo(userInfoReplyQueue);
//闯将消息生产这并将将消息队列中
MessageProducer producer = session.createProducer(userInfoQueue);
producer.send(textMessage);
//创建消息消费者并监听回复队列中的消息
session.createConsumer(userInfoReplyQueue).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textmsg = (TextMessage) message;
try {
System.out.println(textmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
�Consumer:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static void main(String[] args) throws JMSException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列"userInfo"
Queue userInfoQueue = session.createQueue("userInfo");
//创建"userInfo"消息队列的消费者并设置监听器监听队列内的数据
MessageConsumer userInfoConsumer = session.createConsumer(userInfoQueue);
userInfoConsumer.setMessageListener(e->{
try {
//打印消息
TextMessage textMessage = (TextMessage) e;
System.out.println(textMessage.getText());
//获取消息内的replyTo并向队列中写出已消费的响应消息
Destination jmsReplyTo = textMessage.getJMSReplyTo();
MessageProducer producer = session.createProducer(jmsReplyTo);
producer.send(session.createTextMessage(textMessage.getText()+"已消费"));
} catch (JMSException ex) {
ex.printStackTrace();
}
});
}
}
13. Browser 获取即时队列的内容。
可以查看队列中的消息而不消费,没有订阅的功能
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import java.util.Enumeration;
public class Browser {
public static void main(String[] args) throws JMSException {
//初始化链接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//创建直连会话
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建browser, 并获取指定队列内的数据, 然后打印
QueueBrowser personBrowser = session.createBrowser(new ActiveMQQueue("person"));
Enumeration enumeration = personBrowser.getEnumeration();
while (enumeration.hasMoreElements()){
System.out.println(enumeration.nextElement().toString());
}
}
}
14. JMSCorrelationID
用于消息之间的关联,给人一种会话的感觉
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
15. QueueRequestor同步消息
可以发送同步消息
本质违背了mq的异步通讯原则
但是mq还是能够提供应用解耦、异构系统的特性
因为使用new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.optimizeAcknowledge=false”);
发送消息后,会等待接收端的回复,如果收不到回复就会造成死等现象!而且该方法没有设置超时等待的功能
16. 批量确认
ActiveMQ缺省支持批量确认消息,批量确认可以提高系统性能
关闭方法:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
17. 消费缓冲与消息积压prefetchSize
消费者端,一般来说消费的越快越好,broker的积压越小越好。
但是考虑到事务性和客户端确认的情况,如果一个消费者一次获取到了很多消息却都不确认,这会造成事务上下文变大,broker端这种“半消费状态”的数据变多,所以ActiveMQ有一个prefetchSize参数来控制未确认情况下,最多可以预获取多少条记录。
Pre-fetch默认值
consumer type | default value |
---|---|
queue | 1000 |
queue browser | 500 |
topic | 32766 |
durable topic | 1000 |
可以通过3中方式设置prefetchSize
创建连接时整体设置
ActiveMQConnectionFactory connectio nFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:5671?jms.prefetchPolicy.all=50"
);
创建连接时对topic和queue单独设置
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:5671?jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1"
);
针对destination单独设置
Destination topic = session.createTopic("user?consumer.prefetchSize=10");
注意:对destination设置prefetchsize后会覆盖连接时的设置值