ActiveMq典型的应用场景
消息队列-异步消息
消息队列-应用解耦
消息队列-流量消峰
MOM面向消息中间件
什么是MOM
面向消息的中间件,使用消息传送提供者来协调消息传输操作。MOM需要提供API的管理工具。
客户端调用api,把消息发送到消息传送提供者指定的目的地,在消息发送之后,客户端会继续执行其他工作,
并且在接收方收到这个消息之前,提供者一直保留该消息。
消息传递域
点对点p2p模式
发布订阅pub/sub模式
点对点模式
1.每个消息只能消费一次
2.消息的生产者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息
Name:消息名称
Number of Pending Message:等待被消费的消息
Number of Consumers:有多少消息在队列中
Message Dequeued:当前有多少消息被消费掉的消息条数
发布订阅模式
1.每个消息可以有多个消费者
2.生产者和消费者有存在时间相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息。
JMS规范允许客户端创建持久订阅
Api
ConnectionFactory:连接工厂
Connection:封装客户端与JMS provider 之间的一个虚拟的连接
Session:生产和消费消息的一个单线程上下文,用于创建producer、consumer、message、queue
Destination:消息发送或者消息接收的目的地
MessageProducer/MessageConsumer:消息生产者/消息消费者
消息组成
消息头
消息体
TextMessage、MapMessage、ByteMessage、
StreamMessage(输入输出流)、ObjectMessage(可序列化对象)
属性
JMS的可靠性机制
JMS消息只有被确认后,才会认为是被成功消费。
消息的消费包含三个阶段:
客户端接收消息
客户端处理消息
消息被确认
事务性会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Session createSession(boolean transacted, int acknowledgeMode)throws JMSException;
Boolean.TRUE 表示这个会话是不是事务性会话
消费端在commit之后,表示该消息被确认过,在session.commit以后会自动签收,否则始终可以拿到这条消息
非事务性会话
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Boolean.FALSE 的时候第二个参数才会生效
在该模式下,消息何是被确认取决于创建会话时的应答模式
Session.AUTO_ACKNOWLEDGE
表示自动确认,当客户端成功从revice方法返回以后,会自动确认消息,接收方无需session.commit()确认
Session.CLIENT_ACKNOWLEDGE
客户确认模式,客户端设置Session.CLIENT_ACKNOWLEDGE,这个消息可以重复的消费,需要在消费端调用textMessage.acknowledge()
方法手动确认。但是需要注意的点是,这种模式下,创建多条消息,客户端在取得这些消息后,在某一条消息中调用textMessage.acknowledge()
方法,那么在这条消息之前的消息会被全部确认,剩余的消息不会被确认。
Session.DUPS_OK_ACKNOWLEDGE
延迟确认模式
本地化事务
持久化策略
1.kahaDB 默认存储方式,基于文件磁盘存储
2.AMQ 基于文件存储
3.JDBC 基于数据库的存储
4.Memory 基于内存的存储
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
1.E:\develop\apache-activemq-5.15.13\data\kahadb
<persistenceAdapter>
<amqPersistenceApater directory="路径" maxFilelengt="文件大小 32默认"/>
</persistenceAdapter>
2.写入速度快,容易恢复
<jdbcPersistenceAdapter dataSource="#指定数据源" createTablesOnStartup="启动的时候是否需要创建表 默认false" />
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="dirverClassName" value="" />
<property name="url" value="" />
<property name="username" value="" />
<property name="passowrd" value="" />
</bean>
ACTIEMQ_ACKS:存储持久化订阅的信息
ACTIEMQ_LOCK:锁表(用来做集群的时候,实现master选举的表)
ACTIEMQ_MISGS:消息表
解决数据库性能瓶颈
JDBC Message store with activeMQ journal
1.引入了快速缓存机制,缓存到Log文件中
2.性能会比jdbc store要好
3.JDBC Message store with activeMQ journal 不能应用于 master/slave模式
3.基于数据库
4.Memory 基于内存的存储
5.8以后引入的持久化策略,通常用于集群配置
<LevelDB />
5.LevelDB
ActiveMQ 网络连接
activeMQ 如果要实现高拓展性和高可用的要求的话,就需要用到网络连接模式。
NetworkConnector
主要用来配置 broker 与 broker 之间的通信连接
配置
<networkConnectors>
<networkConnector uristatic://(...,...) />
</networkConnectors>
静态网络连接
消息丢失,怎么处理,配置消息回流
在destinationPolicy标签下配置
enableAudit=”false” 表示这个消息还可以进行消费,防止消息回流以后当作重复消息,不被处理。
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
</networkBridgeFilterFactory>
</policyEntry>
动态网络连接
总结
P2P
1.如果session关闭时,有一些消息已经收到,但还未被签收,那么当消费者下次连接相同的队列时,消息还会被签收
2.队列可以长期保存消息,知道消息被消费者签收,消费者不必担心因为消息丢失而时刻与jms provider保持连接状态
pub/sub
1.订阅可以分非持久订阅和持久订阅
2.当所有的消息必须接收的时候,则用持久订阅(设置id开启持久订阅),反正,用非持久订阅。