简介

引自官网

Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server. It supports industry standard protocols so users get the benefits of client choices across a broad range of languages and platforms. Connectivity from C, C++, Python, .Net, and more is available. Integrate your multi-platform applications using the ubiquitous AMQP protocol. Exchange messages between your web applications using STOMP over websockets. Manage your IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers the power and flexibility to support any messaging use-case.

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;

由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种通信协议,包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
4、支持AJAX

适用场景

对TPS要求比较低的系统,可以使用ActiveMQ来实现,一方面比较简单,能够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面

但是不适用于消息量巨大的场景。

安装

以windows环境安装为例

下载

从官网下载安装包, http://activemq.apache.org/download.html

image.png
截至本文编写时间,最新版本为5.16.0,点击apache-activemq-5.16.0-bin.zip 下载windows下的安装包
image.png

注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x

解压缩

image.png
其中:

bin目录:存放ActiveMQ的启动脚本activemq.bat,注意分32、64位

conf目录:存放配置文件,重点关注的是activemq.xml、jetty.xml、jetty-realm.properties。

data目录:ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。

webapps目录:ActiveMQ自带的Jetty提供Web管控台

lib目录:ActiveMQ提供部分功能的JAR包

启动

进入bin目录,选择对应位数的目录,我的windows是64位的,打开win64文件夹
image.png
其中,双击activemq.bat可以快速启动ActiveMQ服务,双击InstallService则可以把ActiveMQ安装成Windows服务。
image.png
双击activemq.bat,输出日志启动成功
image.png

启动后,activeMQ会占用两个端口
一个是负责接收发送消息的tcp端口:61616
一个是基于web负责用户界面化管理的端口:8161

访问管理后台

Active提供了管理后台,访问地址http://localhost:8161/admin,默认用户名密码为admin/admin
登录用户名和密码可以在conf目录下修改jetty-realm.properties users.properties两个文件【需重启】
image.png
界面介绍:
home:当前欢迎页
queues:点对点模型列表
Topics:发布-订阅模型列表
subscribers:话题的订阅信息
Connections:客户端的连接
network:当前网络状况
scheduled:计划任务
send:测试发送消息,可以模拟发送两种形式的消息

查看点对点模型消息,目前是空荡荡的,因为没有生产者连接发送消息
image.png
点对点模型界面介绍:
Name:队列名称
Number Of Pending Messages:等待消费的消息 “这个是当前未出队列的数量。可以理解为总接收数-总出队列数”
Number Of Consumers:当前队列的消费者数量
Messages Enqueued:进入队列的消息 “进入队列的总数量,包括出队列的。 这个数量只增不减”
Messages Dequeued:出了队列的消息 “可以理解为是消费者消费掉的数量”
Views:查看详细内容操作
Operations:可选操作
image.png
发布-订阅模型界面介绍:
Name:主题的名字
Number Of Consumers:订阅者的数量
Messages Enqueued:发布的消息的总数
Messages Dequeued:被订阅的消息的总数(被消费)
Operations:可选操作

简单使用

搭建工程,引入如下两个依赖:

  1. <!-- JMS规范的jar依赖 -->
  2. <dependency>
  3. <groupId>javax.jms</groupId>
  4. <artifactId>javax.jms-api</artifactId>
  5. <version>2.0.1</version>
  6. </dependency>
  7. <!-- activeMQ对jms具体实现的jar依赖,我这里选用了对应版本5.16.0的jar包 -->
  8. <dependency>
  9. <groupId>org.apache.activemq</groupId>
  10. <artifactId>activemq-client</artifactId>
  11. <version>5.16.0</version>
  12. </dependency>

点对点模型测试

(1)编写点对点模型(又称队列模型)生产者

  1. public class ActiveMQSender {
  2. // ActiveMQ连接地址
  3. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  4. // 连接账密
  5. public static final String USERNAME = "admin";
  6. public static final String PASSWORD = "admin";
  7. //相当于一个数据库(其实是一个队列)
  8. public static final String DESTINATION = "myQueue";
  9. public static void main(String[] args) {
  10. // 连接工厂
  11. ConnectionFactory connectionFactory;
  12. // 连接
  13. Connection connection = null;
  14. // 会话,接受或者发送消息的线程
  15. Session session;
  16. // 消息的目的地
  17. Destination destination;
  18. // 消息生产者
  19. MessageProducer messageProducer;
  20. try {
  21. // 实例化工厂
  22. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  23. // 获取连接
  24. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  25. // 启动连接
  26. connection.start();
  27. // 创建session
  28. // 参数1:是否事务消息
  29. // 参数2:Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
  30. // Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
  31. // 时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
  32. // Session.DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
  33. session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  34. // 创建一个消息队列,即目的地
  35. destination = session.createQueue(DESTINATION);
  36. // 创建消息生产者
  37. messageProducer = session.createProducer(destination);
  38. // 发送消息
  39. sendMessage(session, messageProducer,"回家吃饭啦");
  40. // 提交
  41. session.commit();
  42. } catch (JMSException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. }
  46. }
  47. /**
  48. * 发送消息
  49. * @param session 会话
  50. * @param messageProducer 生产者对象
  51. * @param messageText 消息内容
  52. * @throws JMSException
  53. */
  54. public static void sendMessage(Session session,MessageProducer messageProducer,String messageText) throws JMSException{
  55. TextMessage message = session.createTextMessage(messageText);
  56. //发出消息
  57. messageProducer.send(message);
  58. System.out.println("生产者发送消息:"+messageText);
  59. }
  60. }

(2)编写点对点模型(又称队列模型)的消费者

  1. public class ActiveMQReceiver {
  2. // ActiveMQ连接地址
  3. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  4. // 连接账密
  5. public static final String USERNAME = "admin";
  6. public static final String PASSWORD = "admin";
  7. //相当于一个数据库(其实是一个队列)
  8. public static final String DESTINATION = "myQueue";
  9. public static void main(String[] args) {
  10. // 连接工厂
  11. ConnectionFactory connectionFactory;
  12. // 连接
  13. Connection connection = null;
  14. // 会话,接受或者发送消息的线程
  15. Session session = null;
  16. // 消息的目的地
  17. Destination destination;
  18. // 消息消费者
  19. MessageConsumer messageConsumer = null;
  20. try {
  21. // 实例化工厂
  22. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  23. // 获取连接
  24. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  25. // 接收消息,需要将连接启动一下,才可以接收到消息
  26. connection.start();
  27. // 创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)
  28. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  29. // 有了session之后,就可以创建消息,目的地,生产者和消费者
  30. destination = session.createQueue(DESTINATION);
  31. // 消费者
  32. messageConsumer = session.createConsumer(destination);
  33. // 循环接收消息
  34. while (true){
  35. // 接收消息 有返回值,是阻塞的
  36. Message message = messageConsumer.receive();
  37. // 判断消息类型
  38. if(message instanceof TextMessage){
  39. String text = ((TextMessage) message).getText();
  40. System.out.println("生产者发来消息:"+text);
  41. }
  42. }
  43. } catch (JMSException e) {
  44. // TODO Auto-generated catch block
  45. e.printStackTrace();
  46. }
  47. }
  48. }

(3)运行生产者实例ActiveMQSender,连接MQ并发送消息:
image.png

ActiveMQ管理后台,点对点模型页面出现一个队列,名称就叫myQueue,待消费消息1条,总入队消息1条
image.png
(4)启动消费者实例ActiveMQReceiver,连接MQ并消费消息:
image.png
ActiveMQ管理后台,点对点模型页面,myQueue队列待消费消息0条,总入队消息1条,总出队消息1条,消费者1只
image.png
选项介绍:
Views — Browse:查看每一条消息明细
Views — Active Consumers:活动的消费者
Views — Active Producers:活动的生产者

Operations — Send To:手工发送消息
Operations — Purge: 清除队列中的消息
Operations — Delete:删除这个队列

消息生产消费都正常,测试通过

发布-订阅模型测试

(1)编写发布-订阅模型(又称主题模型)发布者

  1. public class ActiveMQPublisher {
  2. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  3. public static final String USERNAME = "admin";
  4. public static final String PASSWORD = "admin";
  5. //相当于一个数据库,主题名字
  6. public static final String DESTINATION = "myTopic";
  7. public static void main(String[] args) {
  8. // 连接工厂
  9. ConnectionFactory connectionFactory;
  10. // 连接
  11. Connection connection = null;
  12. // 会话,接受或者发送消息的线程
  13. Session session=null;
  14. // 消息的目的地
  15. Destination destination=null;
  16. // 消息生产者
  17. MessageProducer messageProducer=null;
  18. try {
  19. // 实例化工厂
  20. connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
  21. // 获取连接
  22. connection = connectionFactory.createConnection(USERNAME, PASSWORD);
  23. // 启动连接
  24. connection.start();
  25. // 创建session
  26. // 参数1:是否事务消息
  27. // 参数2:Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
  28. // Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
  29. // 时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
  30. // DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
  31. session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  32. // 创建一个消息队列,即目的地
  33. destination = session.createTopic(DESTINATION);
  34. // 创建消息生产者
  35. messageProducer = session.createProducer(destination);
  36. // 持续输入消息并发布
  37. Scanner scanner = new Scanner(System.in);
  38. String messageText = "";
  39. do{
  40. System.out.println("请发布内容:");
  41. messageText= scanner.nextLine();
  42. // 发送消息,没有返回值,是非阻塞的
  43. sendMessage(session, messageProducer,messageText);
  44. // 提交事务
  45. session.commit();
  46. }while(!messageText.equals("拜拜")); // 如果输入拜拜则直接退出
  47. } catch (JMSException e) {
  48. // TODO Auto-generated catch block
  49. e.printStackTrace();
  50. }finally {
  51. try {
  52. if(messageProducer != null){
  53. messageProducer.close();
  54. }
  55. if(session != null){
  56. session.close();
  57. }
  58. if(connection != null){
  59. connection.close();
  60. }
  61. } catch (JMSException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. }
  66. /**
  67. * 发送消息
  68. * @param session 会话
  69. * @param messageProducer 生产者对象
  70. * @param messageText 消息内容
  71. * @throws JMSException
  72. */
  73. public static void sendMessage(Session session,MessageProducer messageProducer,String messageText) throws JMSException{
  74. TextMessage message = session.createTextMessage(messageText);
  75. //发出消息
  76. messageProducer.send(message);
  77. System.out.println("生产者发送消息:"+messageText);
  78. }
  79. }

(2)编写发布-订阅模型(又称主题模型)订阅者

  1. public class ActiveMQSubcriber {
  2. public static final String BROKER_URL = "tcp://127.0.0.1:61616";
  3. public static final String USERNAME = "admin";
  4. public static final String PASSWORD = "admin";
  5. //相当于一个数据库,主题名字
  6. public static final String DESTINATION = "myTopic";
  7. public static void main(String[] args) {
  8. // 创建多个订阅者订阅同一个主题
  9. for(int i=0;i<5;i++){
  10. new TopicSubcriber(i+"号", BROKER_URL, USERNAME, PASSWORD,DESTINATION).receiveMessage();
  11. }
  12. }
  13. static class TopicSubcriber{
  14. // 连接工厂
  15. private ConnectionFactory connectionFactory;
  16. // 连接
  17. private Connection connection = null;
  18. // 会话,接受或者发送消息的线程
  19. private Session session = null;
  20. // 消息的目的地
  21. private Destination destination;
  22. // 消息消费者
  23. private MessageConsumer messageConsumer = null;
  24. private String brokerUrl,userName,password,destinationName;
  25. // 消费者名称
  26. private String consumerName;
  27. TopicSubcriber(String consumerName,String brokerUrl,String userName,String password,String destinationName){
  28. this.brokerUrl = brokerUrl;
  29. this.userName = userName;
  30. this.password = password;
  31. this.destinationName = destinationName;
  32. this.consumerName = consumerName;
  33. }
  34. public void receiveMessage(){
  35. try {
  36. // 实例化工厂
  37. connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
  38. // 获取连接
  39. connection = connectionFactory.createConnection(this.userName,this.password);
  40. // 接收消息,需要将连接启动一下,才可以接收到消息
  41. connection.start();
  42. // 创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)
  43. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  44. // 有了session之后,就可以创建消息,目的地,生产者和消费者
  45. destination = session.createTopic(this.destinationName);
  46. // 消费者
  47. messageConsumer = session.createConsumer(destination);
  48. System.out.println("消费者"+consumerName+"已订阅");
  49. // 设置监听器,持续监听主题是否有新消息发布
  50. messageConsumer.setMessageListener(new MessageListener() {
  51. public void onMessage(Message message) {
  52. // 判断消息类型
  53. if(message instanceof TextMessage){
  54. String text = null;
  55. try {
  56. text = ((TextMessage) message).getText();
  57. System.out.println("我是消费者"+consumerName+",收到生产者发来消息:"+text);
  58. } catch (JMSException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. });
  64. } catch (JMSException e) {
  65. // TODO Auto-generated catch block
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

(3)先启动订阅者实例ActiveMQSubcriber,连接MQ并订阅主题:
image.png
ActiveMQ管理后台,发布-订阅模型页面出现一个主题,名称就叫myTopic,总入队消息0条,订阅者5个
image.png

(4)运行发布者实例ActiveMQPublisher,连接MQ并发送消息:
image.png
此时myTopic主题中多了一个消息,同时被消费了五次
image.png
订阅者全部接收到消息
image.png

Active持久化消息与非持久化消息

Active支持持久化传输和非持久化传输,默认采用持久化传输

采用持久传输时,传输的消息会保存到磁盘中(messages are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。

采用非持久传输时,发送的消息则是存放在内存中,不会存放到磁盘

采用持久传输时,当Borker宕机 恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。

ActiveMQ的消息持久化机制有KahaDB,JDBC,AMQ,和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的,默认采用的是KahaDB持久化方式

具体配置可在conf/activemq.xml文件进行修改,后续会作详细说明

KahaDB方式

KahaDB是基于日志文件的持久性数据库,是自ActiveMQ 5.4以来的默认存储机制,可用于任何场景,提高了性能和恢能力,它是基于文件的本地数据库存储形式

KahaDb恢复时间远远小于其前身AMQ并且使用更少的数据文件,所以可以完全代替AMQ。

配置
配置文件位于conf/activemq.xml

配置使用kahaDB持久化方式:**

  1. <broker brokerName="broker">
  2. <persistenceAdapter>
  3. <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/>
  4. </persistenceAdapter>
  5. </broker>

相关配置属性:

属性 默认值 描述
directory activemq-data 用于存储消息存储数据和日志文件的目录的路径。
journalDiskSyncInterval 1000 何时执行磁盘同步的时间间隔(ms) journalDiskSyncStrategy=periodic。只有在自上次磁盘同步或日志转到新日志文件后对日志进行了写入时,才会执行同步。
journalDiskSyncStrategy always 从ActiveMQ 5.14.0:此属性配置磁盘同步策略。可用的同步策略列表(按安全性降低和性能提高的顺序):always确保每个日志写入后跟一个磁盘同步(JMS持久性要求)。这是最安全的选项,但也是最慢的选项,因为它需要在每次写入消息后进行同步。这相当于不推荐使用的属性 enableJournalDiskSyncs=true。periodic磁盘将以设定的时间间隔(如果发生写入)而不是在每次日志写入之后同步,这将减少磁盘上的负载并且应该提高吞吐量。滚动到新的日志文件时,磁盘也将同步。默认间隔为1秒。默认间隔提供非常好的性能,同时比更安全 never磁盘同步,因为数据丢失的最大值限制为1秒。请参阅journalDiskSyncInterval更改磁盘同步的频率。never永远不会显式调用同步,并且操作系统将刷新到磁盘。这相当于设置deprecated属性enableJournalDiskSyncs=false。这是最快的选择,但是最不安全,因为无法确保何时将数据刷新到磁盘。因此,代理失败时可能会发生消息丢失。
journalMaxFileLength 32mb 提示设置消息数据日志的最大大小。


数据目录
如上述实例使用的就是KahaDB持久化存储方式image.png
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址
image.png
db.data包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件**,本质上是B-Tree(B树),使用B-Tree作为索引指向db-.log里面存储的消息

db-.log**:(主要存数据)KahaDB存储消息到预定义大小的数据记录文件**,文件命名为db-.log,当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随之消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如sb-1.log……当不再有引用到数据文件中的任何消息时,文件会被删除或归档。

db.redo**:用来进行消息恢复**,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。

lock**:表示当前获得KahaDB读写权限的broker**。

JDBC方式

使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。
activemq_acks用于存储订阅关系,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock在集群环境中才有用,这个表用于记录哪个Broker是当前的Master Broker。

配置

  1. <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
  2. <persistenceAdapter>
  3. <jdbcPersistenceAdapter useDatabaseLock="false" dataSource="#mysql-ds" createTablesOnStartup="true"/>
  4. </persistenceAdapter>
  5. </broker>
  6. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  7. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  8. <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  9. <property name="username" value="activemq"/>
  10. <property name="password" value="activemq"/>
  11. <property name="maxActive" value="200"/>
  12. <property name="poolPreparedStatements" value="true"/>
  13. </bean>

相关配置属性:

属性 默认值 描述
dataSource #mysql-ds 指定持久化数据库的bean配置id
createTablesOnStartup true 是否在启动的时候创建数据表,一般是第一次启动的时候设置为true,之后改成false。
useDatabaseLock true 如果不设置为false,本机调试会导致Failed to acquire lock

引入数据库驱动

由于activeMQ消息服务器,没有自带mysql数据库的驱动程序。我们需要手动将mysql驱动添加到消息服务器

将以下三个驱动拷贝到apache-activemq-5.16.0\lib\目录下:
数据库连接池:commons-dbcp-1.4.jarcommons-pool-1.6.jar
mysql JDBC驱动包:mysql-connector-java-5.1.35.jar
image.png

新建空数据库activemq
image.png

启动ActiveMQ
数据库多了三张表
image.png

启动生产者并生产消息
image.png
消息队列多了一条消息
image.png
数据库表多了一条数据
image.png
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高

启动消费者消费消息
image.png
image.png
消费完成,数据库表中对应消息被删除
image.png

AMQ方式

性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。
当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。
主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。
而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。目前已经弃用

配置使用

  1. <persistenceAdapter>
  2. <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
  3. </persistenceAdapter>

启动报错:

以元素 ‘amqPersistenceAdapter’ 开头的无效内容。 应以 ‘{“http://activemq.apache.org/schema/core":**jdbcPersistenceAdapter**, “http://activemq.apache.org/schema/core":**journalPersistenceAdapter**, “http://activemq.apache.org/schema/core":**kahaDB**, “http://activemq.apache.org/schema/core":**levelDB**, “http://activemq.apache.org/schema/core":**mKahaDB**, “http://activemq.apache.org/schema/core":**memoryPersistenceAdapter**, “http://activemq.apache.org/schema/core":**replicatedLevelDB**, WC[##other:”http://activemq.apache.org/schema/core"]}‘ 之一开头。

LevelDB方式

LevelDB持久性适配器使用LevelDB作为高性能的消息存储。
它是一个基于文件的存储库,它使用了Google的LevelDB,将索引保存到包含消息的日志文件中。
它经过优化,提供了比KahaDB更快的持久性。它类似于KahahDB,但是它没有使用自定义的b树实现来索引写前日志,而是使用基于LevelDB的索引

配置使用

  1. <broker brokerName="broker">
  2. <persistenceAdapter>
  3. <levelDB directory="${activemq.data}/levelDB"/>
  4. </persistenceAdapter>
  5. </broker>

Active主题持久性订阅

我们知道,基于队列的消息模型(点对点模型),消费者可以消费在它上线之前和之后的消息。而主题模型(发布-订阅模型)的消费者,正常情况下(非持久性订阅)只能消费自他上线后由生产者发布的消息。
**
但是JMS协议允许创建持久性订阅消费者,这样当Broker发送消息给订阅者时,如果订阅者处于 inactive 状态:持久订阅者可以收到消息,而非持久订阅者则收不到消息。【当持久订阅者处于 inactive 状态时,Broker会为持久订阅者保存消息】

设置ClientID

  1. // activemq区分消费者,是通过clientID和订户名称来区分的。
  2. // 使用相同的“clientID”,则认为是同一个消费者
  3. // 创建connection
  4. connection = connectionFactory.createConnection();
  5. connection.setClientID("bbb"); //持久订阅需要设置这个。
  6. connection.start();

创建持久性订阅者

  1. //MessageConsumer consumer = session.createConsumer(DESTINATION); //普通订阅
  2. // 第一个参数为主题,第二个参数为订阅者名称
  3. MessageConsumer consumer = session.createDurableSubscriber(DESTINATION,consumerName); //持久订阅

设置生产者发送持久模式消息

  1. producer.setDeliveryMode(DeliveryMode.PERSISTENT);

消费测试

先启动生产者,并发送消息
image.png

主题内仅有一条待消费消息,且有5个订阅者
image.png

点击Active Subscribers选项,查看订阅者状态,全部未上线
image.png
启动持久化订阅消费者
image.png
如果开启了JDBC持久化模式,可以查看数据库的activemq_acks表,有五条数据
image.png