1. 事务

开启事务:

  1. public static void main(String[] args) throws JMSException, InterruptedException {
  2. //初始化链接
  3. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  4. ActiveMQConnectionFactory.DEFAULT_USER,
  5. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  6. "tcp://localhost:61616"
  7. );
  8. //创建直连会话
  9. Connection connection = activeMQConnectionFactory.createConnection();
  10. connection.start();
  11. //设置为true表示开启事务
  12. //Session.AUTO_ACKNOWLEDGE 消息签收模式为自动签收
  13. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  14. }

用来提交/回滚事务:

  1. session.commit();
  2. session.rollback();
  • 发送消息时时如果不commit,即使调用了send方法消息也不会发送。
  • 消费消息时时如果不commit,消息不会被确认。
  • 多次send可以对应一次commit
  • 假如 Customer1 消费消息时如果没有ack,则 Broker会一直等待 Customer1 确认消息,如果这个时候 Customer2 连接进来则不会看到 Customer1 这些待确认消息。假如 Customer1 还留有待确认的消息就直接下线了,则 Customer2 会收到这些待确认的消息。

事务导致的消息重复投递:
假设 消费者A 没有commit消息就被迫下线,则未被commit的消息则会activeMQ再次投递给其他消费者消费。

2. 消息签收模式

消息签收代表接收端的session已收到消息的一次确认,反馈给broker

ActiveMQ支持自动签收与手动签收

|

Session.AUTO_ACKNOWLEDGE

当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

Session.CLIENT_ACKNOWLEDGE

| 客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。 | |

Session.DUPS_OK_ACKNOWLEDGE

| Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。 |

3. 消息持久化

默认持久化是开启的

  1. queueProducer.getDeliveryMode(); //2 默认是持久化
  2. queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置为不持久化
  3. queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置为持久化

4. 消费优先级

设置消息的消费优先级可以打乱消费顺序,级别高的先消费。

4.1 修改配置
使用级别消费需要先在activemq.xml中配置开启优先级的消费模式:
beans -> broker -> destinationPolicy -> policyMap -> policyEntries 下新增一个配置:

  1. <policyEntry queue="user" prioritizedMessages="true" />

其中 ”user“ 是名字

然后重启MQ.

4.2 producer设置消费级别

设置整体消息的优先级

  1. Queue userQueue = session.createQueue("user");
  2. MessageProducer queueProducer = session.createProducer(userQueue);
  3. queueProducer.setPriority(4); //设置优先级为4级

设置单个消息的优先级

  1. //发送文本消息
  2. TextMessage textMessage = session.createTextMessage("hahah"+i);
  3. queueProducer.setPriority(4); //设置优先级为4级
  4. queueProducer.send(textMessage);
  5. //或
  6. //2 :表示消息优先级
  7. //1000L: 为超时设置
  8. queueProducer.send(textMessage,DeliveryMode.NON_PERSISTENT,2,1000L);

消息的默认优先级为4级。

5. 临时消息和非临时消息

临时消息时session级别的,它的生命周期伴随着session,一但session销毁则所有的临时消息都会销毁。
非临时消息时整个mq级别的,它的生命周期是永久。

创建临时消息

创建非临时消息

6. 消息超时与死信队列

6.1 消息超时机制

  1. producer.setTimeToLive()

如果设置了消息超时,消费端在超时后无法在消费到此消息。流程如下:

给消息设置一个超时时间 -> 死信队列-> 拿出来 -> 重发

image.png

6.2 死信队列

进入到**ActiveMQ.DLQ**队列且不会自动清除,次队列成为死信队列(Dead Letter Message)

此处有消息堆积的风险

6.2.1 配置死信队列

  1. <policyEntry queue="f" prioritizedMessages="true" >
  2. <deadLetterStrategy>
  3. <individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" />
  4. </deadLetterStrategy>
  5. </policyEntry>

6.2.2 死信队列的配置


说明 枚举值 默认值
queuePrefix 死信队列名称
useQueueForQueueMessages 是否使用queue存储死信 true/false true
useQueueForTopicMessages 是否使用Topic存储死信 true/false
processNonPersistent 非持久化的消息是否进入死信 true/false
processExpired 过期消息是否进死信队列 true/false

7. 独占消费者

Producer

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. public class Producer {
  4. public static void main(String[] args) throws JMSException {
  5. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
  6. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  7. "tcp://localhost:61616");
  8. Connection connection = activeMQConnectionFactory.createConnection();
  9. connection.start();
  10. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  11. Queue userQueue = session.createQueue("user");
  12. MessageProducer producer = session.createProducer(userQueue);
  13. for (int i = 0; i < 1000; i++) {
  14. TextMessage textMessage = session.createTextMessage("name=wangfan age=" + i);
  15. producer.send(textMessage);
  16. }
  17. }
  18. }

Customer

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. public class Consumer {
  4. public static void main(String[] args) throws JMSException {
  5. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  6. ActiveMQConnectionFactory.DEFAULT_USER,
  7. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  8. "tcp://localhost:61616");
  9. Connection connection = activeMQConnectionFactory.createConnection();
  10. connection.start();
  11. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  12. Queue userQueue = session.createQueue("user?consumer.exclusive=true");
  13. MessageConsumer consumer = session.createConsumer(userQueue);
  14. consumer.setMessageListener(msg->{
  15. TextMessage textMessage = (TextMessage) msg;
  16. try {
  17. Thread.sleep(1000L);
  18. System.out.println(textMessage.getText());
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. });
  23. }
  24. }

先后启动2个消费者A和B,观察打印。
关闭消费者A后观察再消费者B。

还可以设置优先级

  1. Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");

8. 消息类型

8.1 Object类型

必须要发送的类要实现Serializable接口
Person类

  1. import java.io.Serializable;
  2. @Data
  3. @AllArgsConstructor
  4. @ToString
  5. public class Person implements Serializable {
  6. private String name;
  7. private int age;
  8. private String address;
  9. }

Producer

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /**
  4. * @Author:壹心科技BCF项目组 wangfan
  5. * @Date:Created in 2021/9/22 16:29
  6. * @Project:epec
  7. * @Description:TODO
  8. * @Modified By:wangfan
  9. * @Version: V1.0
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws JMSException, InterruptedException {
  13. //初始化链接
  14. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  15. ActiveMQConnectionFactory.DEFAULT_USER,
  16. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  17. "tcp://localhost:61616"
  18. );
  19. //设置信任包
  20. activeMQConnectionFactory.setTrustAllPackages(true);
  21. //创建直连会话
  22. Connection connection = activeMQConnectionFactory.createConnection();
  23. connection.start();
  24. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  25. /**
  26. * 发送队列消息
  27. */
  28. //创建一个队列
  29. Queue personQueue = session.createQueue("person");
  30. MessageProducer queueProducer = session.createProducer(personQueue);
  31. for (int i = 0; i<100; i++){
  32. Person person = new Person("王帆" + i+"号", i, "河北省");
  33. ObjectMessage objectMessage = session.createObjectMessage(person);
  34. queueProducer.send(objectMessage);
  35. }
  36. //关闭链接
  37. session.close();
  38. }
  39. }

Consumer

  1. if(message instanceof ActiveMQObjectMessage) {
  2. ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)message;
  3. Person person = (Person)activeMQObjectMessage.getObject();
  4. System.out.println(person);
  5. }

如果遇到此类报错

  1. Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
  2. at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
  3. at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
  4. at com.mashibing.mq.Receiver.main(Receiver.java:65)
  5. Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
  6. at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
  7. at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
  8. at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
  9. at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
  10. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
  11. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
  12. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
  13. at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
  14. ... 1 more

需要添加信任

  1. activeMQConnectionFactory.setTrustedPackages(
  2. new ArrayList<String>(Arrays.asList(
  3. Person.class.getPackage().getName() //信任Person类的package
  4. ))
  5. );
  6. //或者 信任所有的package
  7. activeMQConnectionFactory.setTrustAllPackages(true);

8.2 BytesMessage

Producer

  1. BytesMessage bytesMessage = session.createBytesMessage();
  2. bytesMessage.writeBytes("str".getBytes());
  3. bytesMessage.writeUTF("哈哈");

Customer

  1. if(message instanceof BytesMessage) {
  2. BytesMessage bm = (BytesMessage)message;
  3. byte[] b = new byte[1024];
  4. int len = -1;
  5. while ((len = bm.readBytes(b)) != -1) {
  6. System.out.println(new String(b, 0, len));
  7. }
  8. }

还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序

  1. bm.readBoolean()
  2. bm.readUTF()

写入文件

  1. FileOutputStream out = null;
  2. try {
  3. out = new FileOutputStream("d:/aa.txt");
  4. } catch (FileNotFoundException e2) {
  5. e2.printStackTrace();
  6. }
  7. byte[] by = new byte[1024];
  8. int len = 0 ;
  9. try {
  10. while((len = bm.readBytes(by))!= -1){
  11. out.write(by,0,len);
  12. }
  13. } catch (Exception e1) {
  14. e1.printStackTrace();
  15. }

8.3 MapMessage

Producer

  1. MapMessage mapMessage = session.createMapMessage();
  2. mapMessage.setString("name","lucy");
  3. mapMessage.setBoolean("yihun",false);
  4. mapMessage.setInt("age", 17);
  5. producer.send(mapMessage);

Customer

  1. Message message = consumer.receive();
  2. MapMessage mes = (MapMessage) message;
  3. System.out.println(mes);
  4. System.out.println(mes.getString("name"));

9. 消息监听器

  1. public class Consumer {
  2. public static void main(String[] args) throws JMSException {
  3. //初始化链接
  4. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  5. ActiveMQConnectionFactory.DEFAULT_USER,
  6. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  7. "tcp://localhost:61616"
  8. );
  9. //创建直连会话
  10. Connection connection = activeMQConnectionFactory.createConnection();
  11. connection.start();
  12. Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
  13. Queue userQueue = session.createQueue("user");
  14. MessageConsumer userConsumer = session.createConsumer(userQueue);
  15. //使用消息监听器接收消息
  16. userConsumer.setMessageListener(e->{
  17. TextMessage textMessage = (TextMessage)e;
  18. try {
  19. System.out.println(textMessage.getText());
  20. } catch (JMSException ex) {
  21. ex.printStackTrace();
  22. }
  23. });
  24. }
  25. }

10. 消息发送

10.1 同步与异步

开启事务 关闭事务
持久化 异步 同步
非持久化 异步 异步

我们可以通过以下几种方式来设置异步发送:

  1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
  2. "admin",
  3. "admin",
  4. "tcp://localhost:61616"
  5. );
  6. // 2.获取一个向ActiveMQ的连接
  7. connectionFactory.setUseAsyncSend(true);
  8. ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
  9. //设置为异步发送
  10. connection.setUseAsyncSend(true);

10.2消息堆积

producer每发送一个消息,统计一下发送的字节数,当字节数达到producerWindowSize值时,需要等待broker的确认,才能继续发送。

brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
或者
destinationUri中设置: xxxx?producer.windowSize=1048576

10.3 延迟消息投递

首先在配置文件中开启延迟和调度
schedulerSupport=”true”

  1. <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延迟发送

  1. TextMessage textMessage = session.createTextMessage("this is message");
  2. message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);

10.4 按照指定频率重复发送

  1. TextMessage textMessage = session.createTextMessage("this is message");
  2. //设置延迟10秒发送
  3. message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10 * 1000);
  4. //设置重复次数
  5. message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
  6. //设置每次重复消息的间隔时间
  7. message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 2 * 1000);
  8. producer.send(message);

10.5 Cron表达式定时发送

  1. TextMessage textMessage = session.createTextMessage("this is message body");
  2. //每隔5分鍾發送一次
  3. textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 0/5 * * * ? ");

11. 使用selector过滤消息

消息发送

  1. MapMessage msg1 = session.createMapMessage();
  2. msg1.setString("name", "qiqi");
  3. msg1.setStringProperty("name", "qiqi");
  4. msg1.setString("age", "18");
  5. msg1.setIntProperty("age", 18);
  6. MapMessage msg2 = session.createMapMessage();
  7. msg2.setString("name", "lucy");
  8. msg2.setString("age", "18");
  9. msg2.setStringProperty("name", "lucy");
  10. msg2.setIntProperty("age", 18);
  11. MapMessage msg3 = session.createMapMessage();
  12. msg3.setString("name", "qianqian");
  13. msg3.setString("age", "17");
  14. msg3.setStringProperty("name", "qianqian");
  15. msg3.setIntProperty("age", 17);

消息过滤接收

  1. String selector1 = "age > 17 and name = 'lucy'";
  2. MessageConsumer consumer = session.createConsumer(queue,selector1);

12. 整合SpringBoot

pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-activemq</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.messaginghub</groupId>
  7. <artifactId>pooled-jms</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.apache.commons</groupId>
  11. <artifactId>commons-pool2</artifactId>
  12. </dependency>

application.properties

  1. spring.activemq.broker-url=tcp://localhost:61616
  2. spring.activemq.user=admin
  3. spring.activemq.password=admin
  4. spring.jms.pub-sub-domain=true
  5. spring.activemq.pool.enabled=true
  6. spring.activemq.pool.max-connections=50

TestActiveMQController

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.RequestMapping;
  3. import org.springframework.web.bind.annotation.RestController;
  4. import com.mashibing.activemq03.service.SenderService;
  5. @RestController
  6. public class MainController {
  7. @Autowired
  8. SenderService senderSrv;
  9. @RequestMapping("send")
  10. public String send() {
  11. senderSrv.send("springboot","hello~!");
  12. return "ok";
  13. }
  14. }

ActiveMQConfig

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.jms.annotation.EnableJms;
  4. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  5. import org.springframework.jms.config.JmsListenerContainerFactory;
  6. import javax.jms.ConnectionFactory;
  7. @Configuration
  8. @EnableJms
  9. public class ActiveMQConfig {
  10. // topic模式的ListenerContainer
  11. @Bean
  12. public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
  13. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  14. bean.setPubSubDomain(true);
  15. bean.setConnectionFactory(activeMQConnectionFactory);
  16. return bean;
  17. }
  18. // queue模式的ListenerContainer
  19. @Bean
  20. public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
  21. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  22. bean.setConnectionFactory(activeMQConnectionFactory);
  23. return bean;
  24. }
  25. }

MyJmsListener

  1. import org.springframework.jms.annotation.JmsListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MyJmsListener {
  5. @JmsListener(destination = "spring boot queue",containerFactory = "jmsListenerContainerQueue")
  6. public void rece(String msg) {
  7. System.out.println("收到消息:" + msg);
  8. }
  9. }

13. ReplyTo机制

image.png
producer:

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import org.apache.activemq.command.ActiveMQQueue;
  3. import javax.jms.*;
  4. public class producer {
  5. public static void main(String[] args) throws JMSException {
  6. //初始化链接
  7. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  8. ActiveMQConnectionFactory.DEFAULT_USER,
  9. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  10. "tcp://localhost:61616"
  11. );
  12. //获取链接并启动
  13. Connection connection = activeMQConnectionFactory.createConnection();
  14. connection.start();
  15. //创建消息队列"userInfo"
  16. Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
  17. Queue userInfoQueue = session.createQueue("userInfo");
  18. //创建回复队列
  19. ActiveMQQueue userInfoReplyQueue = new ActiveMQQueue("userInfoReply");
  20. //创建消息
  21. TextMessage textMessage = session.createTextMessage("name=wangfan");
  22. textMessage.setJMSReplyTo(userInfoReplyQueue);
  23. //闯将消息生产这并将将消息队列中
  24. MessageProducer producer = session.createProducer(userInfoQueue);
  25. producer.send(textMessage);
  26. //创建消息消费者并监听回复队列中的消息
  27. session.createConsumer(userInfoReplyQueue).setMessageListener(new MessageListener() {
  28. @Override
  29. public void onMessage(Message message) {
  30. TextMessage textmsg = (TextMessage) message;
  31. try {
  32. System.out.println(textmsg.getText());
  33. } catch (JMSException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. });
  38. }
  39. }

�Consumer:

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. public class Consumer {
  4. public static void main(String[] args) throws JMSException {
  5. //初始化链接
  6. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  7. ActiveMQConnectionFactory.DEFAULT_USER,
  8. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  9. "tcp://localhost:61616"
  10. );
  11. //创建连接
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. connection.start();
  14. //创建会话
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. //创建队列"userInfo"
  17. Queue userInfoQueue = session.createQueue("userInfo");
  18. //创建"userInfo"消息队列的消费者并设置监听器监听队列内的数据
  19. MessageConsumer userInfoConsumer = session.createConsumer(userInfoQueue);
  20. userInfoConsumer.setMessageListener(e->{
  21. try {
  22. //打印消息
  23. TextMessage textMessage = (TextMessage) e;
  24. System.out.println(textMessage.getText());
  25. //获取消息内的replyTo并向队列中写出已消费的响应消息
  26. Destination jmsReplyTo = textMessage.getJMSReplyTo();
  27. MessageProducer producer = session.createProducer(jmsReplyTo);
  28. producer.send(session.createTextMessage(textMessage.getText()+"已消费"));
  29. } catch (JMSException ex) {
  30. ex.printStackTrace();
  31. }
  32. });
  33. }
  34. }

13. Browser 获取即时队列的内容。

可以查看队列中的消息而不消费,没有订阅的功能

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import org.apache.activemq.command.ActiveMQQueue;
  3. import javax.jms.Connection;
  4. import javax.jms.JMSException;
  5. import javax.jms.QueueBrowser;
  6. import javax.jms.Session;
  7. import java.util.Enumeration;
  8. public class Browser {
  9. public static void main(String[] args) throws JMSException {
  10. //初始化链接
  11. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  12. ActiveMQConnectionFactory.DEFAULT_USER,
  13. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  14. "tcp://localhost:61616"
  15. );
  16. //创建直连会话
  17. Connection connection = activeMQConnectionFactory.createConnection();
  18. connection.start();
  19. //创建session
  20. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  21. //创建browser, 并获取指定队列内的数据, 然后打印
  22. QueueBrowser personBrowser = session.createBrowser(new ActiveMQQueue("person"));
  23. Enumeration enumeration = personBrowser.getEnumeration();
  24. while (enumeration.hasMoreElements()){
  25. System.out.println(enumeration.nextElement().toString());
  26. }
  27. }
  28. }

14. JMSCorrelationID

用于消息之间的关联,给人一种会话的感觉
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

15. QueueRequestor同步消息

可以发送同步消息
本质违背了mq的异步通讯原则
但是mq还是能够提供应用解耦、异构系统的特性
因为使用new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.optimizeAcknowledge=false”);

发送消息后,会等待接收端的回复,如果收不到回复就会造成死等现象!而且该方法没有设置超时等待的功能

16. 批量确认

ActiveMQ缺省支持批量确认消息,批量确认可以提高系统性能
关闭方法:

  1. new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
  1. ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
  1. ((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

17. 消费缓冲与消息积压prefetchSize

消费者端,一般来说消费的越快越好,broker的积压越小越好。
但是考虑到事务性和客户端确认的情况,如果一个消费者一次获取到了很多消息却都不确认,这会造成事务上下文变大,broker端这种“半消费状态”的数据变多,所以ActiveMQ有一个prefetchSize参数来控制未确认情况下,最多可以预获取多少条记录。
Pre-fetch默认值

consumer type default value
queue 1000
queue browser 500
topic 32766
durable topic 1000

可以通过3中方式设置prefetchSize

创建连接时整体设置

  1. ActiveMQConnectionFactory connectio nFactory = new ActiveMQConnectionFactory(
  2. "admin",
  3. "admin",
  4. "tcp://localhost:5671?jms.prefetchPolicy.all=50"
  5. );

创建连接时对topic和queue单独设置

  1. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
  2. "admin",
  3. "admin",
  4. "tcp://localhost:5671?jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1"
  5. );

针对destination单独设置

  1. Destination topic = session.createTopic("user?consumer.prefetchSize=10");

注意:对destination设置prefetchsize后会覆盖连接时的设置值

16. 可追溯