ActiveMq典型的应用场景

消息队列-异步消息

消息队列-应用解耦

消息队列-流量消峰

MOM面向消息中间件

什么是MOM

  1. 面向消息的中间件,使用消息传送提供者来协调消息传输操作。MOM需要提供API的管理工具。
  2. 客户端调用api,把消息发送到消息传送提供者指定的目的地,在消息发送之后,客户端会继续执行其他工作,
  3. 并且在接收方收到这个消息之前,提供者一直保留该消息。

消息传递域

点对点p2p模式

发布订阅pub/sub模式

image.png

  1. 点对点模式
  2. 1.每个消息只能消费一次
  3. 2.消息的生产者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息
  4. Name:消息名称
  5. Number of Pending Message:等待被消费的消息
  6. Number of Consumers:有多少消息在队列中
  7. Message Dequeued:当前有多少消息被消费掉的消息条数
  8. 发布订阅模式
  9. 1.每个消息可以有多个消费者
  10. 2.生产者和消费者有存在时间相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
  11. JMS规范允许客户端创建持久订阅
  12. Api
  13. ConnectionFactory:连接工厂
  14. Connection:封装客户端与JMS provider 之间的一个虚拟的连接
  15. Session:生产和消费消息的一个单线程上下文,用于创建producerconsumermessagequeue
  16. Destination:消息发送或者消息接收的目的地
  17. MessageProducer/MessageConsumer:消息生产者/消息消费者

消息组成

消息头

包含消息识别信息和路由信息

消息体

TextMessage、MapMessage、ByteMessage、
StreamMessage(输入输出流)、ObjectMessage(可序列化对象)

属性

JMS的可靠性机制

  1. JMS消息只有被确认后,才会认为是被成功消费。
  2. 消息的消费包含三个阶段:
  3. 客户端接收消息
  4. 客户端处理消息
  5. 消息被确认
  6. 事务性会话
  7. Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  8. Session createSession(boolean transacted, int acknowledgeMode)throws JMSException;
  9. Boolean.TRUE 表示这个会话是不是事务性会话
  10. 消费端在commit之后,表示该消息被确认过,在session.commit以后会自动签收,否则始终可以拿到这条消息
  11. 非事务性会话
  12. Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  13. Boolean.FALSE 的时候第二个参数才会生效
  14. 在该模式下,消息何是被确认取决于创建会话时的应答模式
  15. Session.AUTO_ACKNOWLEDGE
  16. 表示自动确认,当客户端成功从revice方法返回以后,会自动确认消息,接收方无需session.commit()确认
  17. Session.CLIENT_ACKNOWLEDGE
  18. 客户确认模式,客户端设置Session.CLIENT_ACKNOWLEDGE,这个消息可以重复的消费,需要在消费端调用textMessage.acknowledge()
  19. 方法手动确认。但是需要注意的点是,这种模式下,创建多条消息,客户端在取得这些消息后,在某一条消息中调用textMessage.acknowledge()
  20. 方法,那么在这条消息之前的消息会被全部确认,剩余的消息不会被确认。
  21. Session.DUPS_OK_ACKNOWLEDGE
  22. 延迟确认模式

本地化事务

持久化策略

1.kahaDB 默认存储方式,基于文件磁盘存储
2.AMQ 基于文件存储
3.JDBC 基于数据库的存储
4.Memory 基于内存的存储

  1. <persistenceAdapter>
  2. <kahaDB directory="${activemq.data}/kahadb"/>
  3. </persistenceAdapter>
  4. 1.E:\develop\apache-activemq-5.15.13\data\kahadb
  5. <persistenceAdapter>
  6. <amqPersistenceApater directory="路径" maxFilelengt="文件大小 32默认"/>
  7. </persistenceAdapter>
  8. 2.写入速度快,容易恢复
  9. <jdbcPersistenceAdapter dataSource="#指定数据源" createTablesOnStartup="启动的时候是否需要创建表 默认false" />
  10. <bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  11. <property name="dirverClassName" value="" />
  12. <property name="url" value="" />
  13. <property name="username" value="" />
  14. <property name="passowrd" value="" />
  15. </bean>
  16. ACTIEMQ_ACKS:存储持久化订阅的信息
  17. ACTIEMQ_LOCK:锁表(用来做集群的时候,实现master选举的表)
  18. ACTIEMQ_MISGS:消息表
  19. 解决数据库性能瓶颈
  20. JDBC Message store with activeMQ journal
  21. 1.引入了快速缓存机制,缓存到Log文件中
  22. 2.性能会比jdbc store要好
  23. 3.JDBC Message store with activeMQ journal 不能应用于 master/slave模式
  24. 3.基于数据库
  25. 4.Memory 基于内存的存储
  26. 5.8以后引入的持久化策略,通常用于集群配置
  27. <LevelDB />
  28. 5.LevelDB

ActiveMQ 网络连接

activeMQ 如果要实现高拓展性和高可用的要求的话,就需要用到网络连接模式。

NetworkConnector

主要用来配置 broker 与 broker 之间的通信连接

配置

  1. <networkConnectors>
  2. <networkConnector uristatic://(...,...) />
  3. </networkConnectors>

静态网络连接

消息丢失,怎么处理,配置消息回流
在destinationPolicy标签下配置标签
enableAudit=”false” 表示这个消息还可以进行消费,防止消息回流以后当作重复消息,不被处理。

  1. <policyEntry queue=">" enableAudit="false">
  2. <networkBridgeFilterFactory>
  3. <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  4. </networkBridgeFilterFactory>
  5. </policyEntry>

动态网络连接

总结

P2P

1.如果session关闭时,有一些消息已经收到,但还未被签收,那么当消费者下次连接相同的队列时,消息还会被签收
2.队列可以长期保存消息,知道消息被消费者签收,消费者不必担心因为消息丢失而时刻与jms provider保持连接状态

pub/sub

1.订阅可以分非持久订阅和持久订阅
2.当所有的消息必须接收的时候,则用持久订阅(设置id开启持久订阅),反正,用非持久订阅。