1. ActiveMQ简介

  1. ActiveMQApache软件基金下的一个开源软件,它遵循JMS规范(Java Message Service),是消息驱动中间件软件(MOM)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。ActiveMQ使用Apache许可协议。因此,任何人都可以使用和修改它而不必反馈任何改变。这对于商业上将ActiveMQ用在重要用途的人尤为关键。MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。所以高可用,高性能,高可扩展性尤为关键。

ActiveMQ优势

  1. 1.OpenJMSJbossMQ等开源jms provider相比,ActiveMQApache的支持,持续发展的优势明显。<br /> 2.消息处理速度很快<br /> 3.提高系统资源的利用率,主要是任务的派发不是24小时平均的,而是高峰时期任务量很多,比如11000多个,有的时候很低,比如十几秒钟才来一个。应用服务通过JMS队列一个一个的取任务,做完一个再领一个,使系统资源的运用趋于平均。比如ActiveMQ在赛扬(2.40GHz)机器上能够达到2000/s,消息大小为1-2k。好一些的服务器可以达到2万以上/秒。

2.ActiveMQ安装

  1. ActiveMQlinux服务上安装操作如下:<br /> 1.在官网下载activemq安装文件。地址:[http://activemq.apache.org/download.html](http://activemq.apache.org/download.html)<br /> 2.上传下载的tar.gz安装文件到linux服务器上,并解压到指定目录:如 tar -xf apache-activemq-5.15.2-bin.tar.gz<br /> 3.运行activemq,进入到解压的 apache-activemq-5.15.2/bin目录,执行命令:activemq start<br /> 4.开放端口8161,61616,保证端口可访问。

3.ActiveMQ类别及开发流程

1)、Point-to-Point (点对点)消息模式开发流程
1、生产者(producer)开发流程:
1.1 创建Connection: 根据url,user和password创建一个jms Connection。
1.2 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
1.3 创建Destination对象: 需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。
1.4 创建MessageProducer: 根据Destination创建MessageProducer对象,同时设置其持久模式。
1.5 发送消息到队列(Queue): 封装Message消息,使用MessageProducer的send方法将消息发送出去。
2、消费者(consumer)开发流程:
2.1 实现MessageListener接口: 消费者类必须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。
2.2 创建Connection: 根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。
2.3 创建Session和Destination: 与ProducerTool.java中的流程类似,不再赘述。
2.4 创建replyProducer【可选】:可以用来将消息处理结果发送给producer。
2.5 创建MessageConsumer: 根据Destination创建MessageConsumer对象。
2.6 消费message: 在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer
2)、Publisher/Subscriber(发布/订阅者)消息模式开发流程
1、订阅者(Subscriber)开发流程:
1.1 实现MessageListener接口: 在onMessage()方法中监听发布者发出的消息队列,并做相应处理。
1.2 创建Connection: 根据url,user和password创建一个jms Connection。
1.3 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
1.4 创建Topic: 创建2个Topic, topictest.messages用于接收发布者发出的消息,topictest.control 用于向发布者发送消息,实现双方的交互。
1.5 创建consumer和producer对象:根据topictest.messages创建consumer,根据topictest.control创建 producer。
1.6 接收处理消息:在onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作等等),并且可以使用producer对象将处理结果返回给发布者。
2、发布者(Publisher)开发流程:
2.1 实现MessageListener接口:在onMessage()方法中接收订阅者的反馈消息。
2.2 创建Connection: 根据url,user和password创建一个jms Connection。
2.3 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
2.4 创建Topic: 创建2个Topic,topictest.messages用于向订阅者发布消息,topictest.control用于接 收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应的。
2.5 创建consumer和producer对象: 根据topictest.messages创建publisher; 根据topictest.control 创建consumer,同时监听订阅者反馈的消息。
2.6 给所有订阅者发送消息,并接收反馈消息。 注:可同时运行多个订阅者测试查看此模式效果 。

3)、延迟消息发送
有时候不希望消息马上被broker投递出去,而是想要消息停留一段时间以后发给消费者,或者想让消息每隔一定时间投递一次,一共投递指定的次数。类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。开发者使用时只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照定义的行为去处理消息。

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-all</artifactId>
  4. </dependency>

在Git上面下载:https://github.com/wuya11/SpringinteJMSonActiveMQ

4. Spring集成JMS链接ActiveMQ

image.png

image.png

image.png

5. 集群

实现高可用,排除单点故障引起服务中断
实现负载均衡,提升效率
image.png