原文地址:https://www.aliyun.com/jiaocheng/791245.html

  1. AcitveMQ是作为一种消息存储和分发组件,涉及到clientbroker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。

ActiveMQ消息传送机制

  1. Producer客户端使用来发送消息的, Consumer客户端用来消费消息;它们的协同中心就是ActiveMQ broker,broker也是让producerconsumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了越来越多的特性,也解决开发者在各种场景下使用ActiveMQ的需求。比如producer支持异步调用;使用flow control机制让broker协同consumer的消费速率;consumer端可以使用prefetchACK来最大化消息消费的速率;提供"重发策略"等来提高消息的安全性等。在此我们不详细介绍。
  2. 一条消息的生命周期如下:<br />![](http://aliyunzixunbucket.oss-cn-beijing.aliyuncs.com/jpg/4597c9e24a859526893df2d2d680abab.jpg?x-oss-process=image/resize,p_100/auto-orient,1/quality,q_90/format,jpg/watermark,image_eXVuY2VzaGk=,t_100,g_se,x_0,y_0#width=)<br /> <br /> 图片中简单的描述了一条消息的生命周期,不过在不同的架构环境中,message的流动行可能更加复杂.将在稍后有关broker的架构中详解..一条消息从producer端发出之后,一旦被broker正确保存,那么它将会被consumer消费,然后ACK,broker端才会删除;不过当消息过期或者存储设备溢出时,也会终结它。<br />![](http://aliyunzixunbucket.oss-cn-beijing.aliyuncs.com/jpg/27ef9811300eab29811ab26477a96f55.jpg?x-oss-process=image/resize,p_100/auto-orient,1/quality,q_90/format,jpg/watermark,image_eXVuY2VzaGk=,t_100,g_se,x_0,y_0#width=)
  3. 这是一张很复杂,而且有些凌乱的图片;这张图片中简单的描述了:1)producer端如何发送消息 2) consumer端如何消费消息 3) broker端如何调度。如果用文字来描述图示中的概念,恐怕一言难尽。图示中,提及到prefetchAck,以及消息同步、异步发送的基本逻辑;这对你了解下文中的ACK机制将有很大的帮助。

optimizeACK

  1. "可优化的ACK",这是ActiveMQ对于consumer在消息消费时,对消息ACK的优化选项,也是consumer端最重要的优化参数之一,你可以通过如下方式开启:
  2. 1) brokerUrl中增加如下查询字符串:
  1. String brokerUrl = "tcp://localhost:61616?" +
  2. "jms.optimizeAcknowledge=true" +
  3. "&jms.optimizeAcknowledgeTimeOut=30000" +
  4. "&jms.redeliveryPolicy.maximumRedeliveries=6";
  5. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
  1. 2) destinationUri中,增加如下查询字符串:
  1. String queueName = "test-queue?customer.prefetchSize";
  2. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  3. Destination queue = session.createQueue(queueName);
  1. 我们需要在brokerUrl指定optimizeACK选项,在destinationUri中指定prefetchSize(预获取)选项,其中brokerUrl参数选项是全局的,即当前factory下所有的connection/session/consumer都会默认使用这些值;而destinationUri中的选项,只会在使用此destinationconsumer实例中有效;如果同时指定,brokerUrl中的参数选项值将会被覆盖。optimizeAck表示是否开启“优化ACK”,只有在为true的情况下,prefetchSize(下文中将会简写成prefetch)以及optimizeAcknowledgeTimeout参数才会有意义。此处需要注意"optimizeAcknowledgeTimeout"选项只能在brokerUrl中配置。
  2. prefetch值建议在destinationUri中指定,因为在brokerUrl中指定比较繁琐;在brokerUrl中,queuePrefetchSizetopicPrefetchSize都需要单独设定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等来逐个指定。
  3. 如果prefetchACKtrue,那么prefetch必须大于0;当prefetchACKfalse时,你可以指定prefetch0以及任意大小的正数。不过,当prefetch=0是,表示consumer将使用PULL(拉取)的方式从broker端获取消息,broker端将不会主动push消息给client端,直到client端发送PullCommand时;当prefetch>0时,就开启了broker push模式,此后只要当client端消费且ACK了一定的消息之后,会立即pushclient端多条消息。
  4. consumer端使用receive()方法同步获取消息时,prefetch可以为0和任意正值;当prefetch=0时,那么receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止,这也意味着消息只能逐个获取(类似于Request<->Response),这也是ActivemqPULL消息模式;当prefetch > 0时,broker端将会批量pushclient 一定数量的消息(<= prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中,只要此队列有消息,那么receive方法将会立即返回,当一定量的消息ACK之后,broker端会继续批量push消息给client端。
  5. consumer端使用MessageListener异步获取消息时,这就需要开发设定的prefetch值必须 >=1,即至少为1;在异步消费消息模式中,设定prefetch=0,是相悖的,也将获得一个Exception
  6. 此外,我们还可以brokerUrl中配置“redelivery”策略,比如当一条消息处理异常时,broker端可以重发的最大次数;和下文中提到REDELIVERED_ACK_TYPE互相协同。当消息需要broker端重发时,consumer会首先在本地的“deliveredMessage队列”(Consumer已经接收但还未确认的消息队列)删除它,然后向broker发送“REDELIVERED_ACK_TYPE”类型的确认指令,broker将会把指令中指定的消息重新添加到pendingQueue(亟待发送给consumer的消息队列)中,直到合适的时机,再次pushclient
  7. 到目前为止,或许你知道了optimizeACKprefeth的大概意义,不过我们可能还会有些疑惑!!optimizeACKprefetch配合,将会达成一个高效的消息消费模型:批量获取消息,并“延迟”确认(ACK)。prefetch表达了“批量获取”消息的语义,broker端主动的批量push多条消息给client端,总比client多次发送PULL指令然后broker返回一条消息的方式要优秀很多,它不仅减少了client端在获取消息时阻塞的次数和阻塞的时间,还能够大大的减少网络开支。optimizeACK表达了“延迟确认”的语义(ACK时机),client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;这比对每条消息都逐个确认,在性能上要提高很多。由此可见,prefetch优化了消息传送的性能,optimizeACK优化了消息确认的性能。
  8. consumer端消息消费的速率很高(相对于producer生产消息),而且消息的数量也很大时(比如消息源源不断的生产),我们使用optimizeACK + prefetch将会极大的提升consumer的性能。不过反过来:
  9. 1) 如果consumer端消费速度很慢(对消息的处理是耗时的),过大的prefetchSize,并不能有效的提升性能,反而不利于consumer端的负载均衡(只针对queue);按照良好的设计准则,当consumer消费速度很慢时,我们通常会部署多个consumer客户端,并使用较小的prefetch,同时关闭optimizeACK,可以让消息在多个consumer间“负载均衡”(即均匀的发送给每个consumer);如果较大的prefetchSize,将会导致broker一次性pushclient大量的消息,但是这些消息需要很久才能ACK(消息积压),而且在client故障时,还会导致这些消息的重发。
  10. 2) 如果consumer端消费速度很快,但是producer端生成消息的速率较慢,比如生产者10秒钟生成10条消息,但是consumer一秒就能消费完毕,而且我们还部署了多个consumer!!这种场景下,建议开启optimizeACK,但是需要设置较小的prefetchSize;这样可以保证每个consumer都能有"活干",否则将会出现一个consumer非常忙碌,但是其他consumer几乎收不到消息。
  11. 3) 如果消息很重要,特别是不原因接收到”redelivery“的消息,那么我们需要将optimizeACK=falseprefetchSize=1
  12. 既然optimizeACK是”延迟“确认,那么就引入一种潜在的风险:在消息被消费之后还没有来得及确认时,client端发生故障,那么这些消息就有可能会被重新发送给其他consumer,那么这种风险就需要client端能够容忍“重复”消息。
  13. prefetch值默认为1000,当然这个值可能在很多场景下是偏大的;我们暂且不考虑ACK_MODE(参见下文),通常情况下,我们只需要简单的统计出单个consumer每秒的最大消费消息数即可,比如一个consumer每秒可以处理100个消息,我们期望consumer端每2秒确认一次,那么我们的prefetchSize可以设置为100 * 2 /0.65大概为300。无论如何设定此值,client持有的消息条数最大为:prefetch + DELIVERED_ACK_TYPE消息条数”(DELIVERED_ACK_TYPE参见下文)
  14. 即使当optimizeACKtrue,也只会当sessionACK_MODEAUTO_ACKNOWLEDGE时才会生效,即在其他类型的ACK_MODEconsumer端仍然不会“延迟确认”,即:

consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()

  1. consumer.optimizeACK有效时,如果客户端已经消费但尚未确认的消息(deliveredMessage)达到prefetch * 0.65consumer端将会自动进行ACK;同时如果离上一次ACK的时间间隔,已经超过"optimizeAcknowledgeTimout"毫秒,也会导致自动进行ACK
  2. 此外简单的补充一下,批量确认消息时,只需要在ACK指令中指明“firstMessageId”和“lastMessageId”即可,即消息区间,那么broker端就知道此consumer(根据consumerId识别)需要确认哪些消息。

ACK模式与类型介绍

  1. JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:
  • AUTO_ACKNOWLEDGE = 1 自动确认

  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认

  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认

  • SESSION_TRANSACTED = 0 事务提交并确认

  1. 此外AcitveMQ补充了一个自定义的ACK_MODE:
  • INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
  1. 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumerbroker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumerBroker之间建立一种简单的“担保”机制. <br /> <br /> Client端指定了ACK_MODE,但是在Clientbroker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。
  2. 比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):
  • DELIVERED_ACK_TYPE = 0 消息”已接收”,但尚未处理结束

  • STANDARD_ACK_TYPE = 2 “标准”类型,通常表示为消息”处理成功”,broker端可以删除消息了

  • POSION_ACK_TYPE = 1 消息”错误”,通常表示”抛弃”此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)

  • REDELIVERED_ACK_TYPE = 3 消息需”重发”,比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息

  • INDIVIDUAL_ACK_TYPE = 4 表示只确认”单条消息”,无论在任何ACK_MODE下

  • UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端”拒绝”消息

  1. 到目前为止,我们已经清楚了大概的原理: Client端在不同的ACK_MODE时,将意味着在不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作. 接下来,我们详细的分析ACK_MODEACK_TYPE.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  1. 我们需要在创建Session时指定ACK_MODE,由此可见,ACK_MODE将是session共享的,意味着一个session下所有的 consumer都使用同一种ACK_MODE。在创建Session时,开发者不能指定除ACK_MODE列表之外的其他值.如果此session为事务类型,用户指定的ACK_MODE将被忽略,而强制使用"SESSION_TRANSACTED"类型;如果session非事务类型时,也将不能将 ACK_MODE设定为"SESSION_TRANSACTED",毕竟这是相悖的.
  2. Consumer消费消息的风格有2种: 同步/异步..使用consumer.receive()就是同步,使用messageListener就是异步;在同一个consumer中,我们不能使用使用这2种风格,比如在使用listener的情况下,当调用receive()方法将会获得一个Exception。两种风格下,消息确认时机有所不同。
  3. "同步"伪代码:
  1. //receive伪代码---过程
  2. Message message = sessionMessageQueue.dequeue();
  3. if(message != null){
  4. ack(message);
  5. }
  6. return message
  1. 同步调用时,在消息从receive方法返回之前,就已经调用了ACK;因此如果Client端没有处理成功,此消息将丢失(可能重发,与ACK_MODE有关)。
  2. "异步"伪代码:
  1. //基于listener
  2. Session session = connection.getSession(consumerId);
  3. sessionQueueBuffer.enqueue(message);
  4. Runnable runnable = new Ruannale(){
  5. run(){
  6. Consumer consumer = session.getConsumer(consumerId);
  7. Message md = sessionQueueBuffer.dequeue();
  8. try{
  9. consumer.messageListener.onMessage(md);
  10. ack(md);//
  11. }catch(Exception e){
  12. redelivery();//sometime,not all the time;
  13. }
  14. }

//session中将采取线程池的方式,分发异步消息
//因此同一个session中多个consumer可以并行消费
threadPool.execute(runnable);

  1. 基于异步调用时,消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息重发。

ACK_MODE详解

  1. AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"似乎充满了不确定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,否则将有可能导致消息的丢失,或者消息的重复接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运作的呢?
  2. 1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
  3. 2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不同。
  4. 3) 在“同步”(receive)方法返回message之前,会检测optimizeACK选项是否开启,如果没有开启,此单条消息将立即确认,所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,即"潜在的消息丢失";如果开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,当然我们可以指定prefetchSize = 1来实现逐条消息确认。
  5. 4) "异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,如果onMessage方法异常,将导致client端补充发送一个ACK_TYPEREDELIVERED_ACK_TYPE确认指令;如果onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,如果重发次数达到阀值,将会导致发送一个ACK_TYPEPOSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,此消息将会被删除或者迁移到"dead letter"通道中。
  6. 因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。
  7. CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。在此模式下,开发者需要需要关注几个方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2)只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。
  8. 我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也很可以使用此ACK_MODE
  9. 如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。
  10. 开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。
  11. 除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
  12. 无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPEDELIVERED_ACK_TYPE的确认指令,这会触发broker端可以继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)
  13. broker端,针对每个Consumer,都会保存一个因为"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,因为Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端通过“prefetchExtension”与prefetchSize互相配合,来决定即将pushclient端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端但是还没有“STANDARD_ACK_TYPE”的消息总量;由此可见,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。我们要求client端在消费1.5*prefetchSize个消息之前,必须acknowledge()一次;通常我们总是每消费一个消息调用一次,这是一种良好的设计。
  14. 此外需要额外的补充一下:所有ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPEACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,如果在此之前,开发者调用了acknowledge()方法,会导致消息直接被确认(STANDARD_ACK_TYPE)。broker端通常会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,如果consumer不能及时的对消息进行acknowledge而导致broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其他Consumer
  15. DUPS_OK_ACKNOWLEDGE : "消息可重复"确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。
  16. 1) ActiveMQ中,如果在DestinationQueue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种情况,在确认时机上几乎完全一致;此外在此模式下,如果prefetchSize =1 或者没有开启optimizeACK,也会导致消息逐条确认,从而失去批量确认的特性。
  17. 2) 如果DestinationTopicDUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即无论optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。
  18. 这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。
  19. SESSION_TRANSACTED : session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,因为在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然可以决定DELIVERED_ACK_TYPE的发送时机。
  20. 因为Session非线程安全,那么当前session下所有的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,已避免rollback()或者commit()方法被多个consumer调用而造成的消息混乱。
  21. consumer接受到消息之后,首先检测TransactionContext是否已经开启,如果没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,如果大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,如果是同步(receive),那么即返回message。上述过程,和其他确认模式没有任何特殊的地方。
  22. 当开发者决定事务可以提交时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认;事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;如果broker端事务操作失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所做的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。
  23. session.commit方法异常时,对于开发者而言通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),当然你可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。需要注意,无论是inner-rollback还是调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。
  24. INDIVIDUAL_ACKNOWLEDGE : 单条消息确认,这种确认模式,我们很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。
  25. 结语:到目前为止,我们已经已经简单的了解了ActiveMQ中消息传送机制,还有JMSACK策略,重点分析了optimizeACK的策略,希望开发者能够在使用activeMQ中避免一些不必要的错误。本文如有疏漏和错误之处,请各位不吝赐教,特此感谢。
  26. 源码参考类:
  27. 1) ActiveMQConnectionFactory,ActiveMQMessageConsumer,ActiveMQSession,MessageAck等<br /> 2) Queue,PrefetchSubscription,TransactionContext,TransactionStore

ActiveMQ的消息重发机制

本文以ActiveMQ最新的5.10版本为准。
大家知道,JMS规范中,Message消息头接口中有setJMSRedelivered(boolean redelivered)和getJMSRedelivered()方法,用于设置和获取消息的重发标志,当然set方法主要是MOM来调用的,我们客户端使用的是get方法。
还记得当时阿里的电话面试曾问过我,你知道ActiveMQ中的消息重发时间间隔和重发次数吗?我当时尴尬了,只知道会重发,还真没去了解过其中的细节,所以最后被完美的“淘汰了”。
后来有时间了就去网上看了下官方的文档,所以现在把ActiveMQ中的重发机制和大家一起分享一下。
首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。
1.如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发。
2.如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发。
3.如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。

说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。
我们可以来对ActiveMQ的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:
a) collisionAvoidanceFactor :碰撞躲避因数,默认值是0.15,这个参数是为了躲避高并发的重发带来的问题,我们查看org.apache.activemq.RedeliveryPolicy类的源代码,
// +/-15% for a 30% spread -cgs
protected double collisionAvoidanceFactor = 0.15d;
protected long initialRedeliveryDelay = 1000L;
可以发现,该默认值带来的变动范围是正负百分之15,也就是有30%的范围,也就是说,如果延迟发送时间(也就是initialRedeliveryDelay 默认值)是1000毫秒,则该条消息第一次有可能被拖延850毫秒到1150毫秒之间后被发送,如果有第二次重发,基数就不是1000毫秒了,而是以上一次重发拖延时间为基础来算。源代码如下:

  1. public long getNextRedeliveryDelay(long previousDelay) {
  2. long nextDelay = redeliveryDelay;
  3. if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
  4. nextDelay = (long) (previousDelay * backOffMultiplier);
  5. if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
  6. // in case the user made max redelivery delay less than redelivery delay for some reason.
  7. nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
  8. }
  9. }
  10. if (useCollisionAvoidance) {
  11. /*
  12. * First random determines +/-, second random determines how far to
  13. * go in that direction. -cgs
  14. */
  15. Random random = getRandomNumberGenerator();
  16. double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
  17. nextDelay += nextDelay * variance;
  18. }
  19. return nextDelay;
  20. }

b)maximumRedeliveries :最大重发次数,默认值是6,如果你想不限次数重发,可以设置成-1。同样是org.apache.activemq.RedeliveryPolicy类中的代码:

  1. public static final int NO_MAXIMUM_REDELIVERIES = -1;
  2. public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6;
  3. protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES;
  4. 我们探究一下maximumRedeliveries get方法,可以发现有org.apache.activemq.ActiveMQSessionorg.apache.activemq.ActiveMQMessageConsumer两个类中有用到:
  5. 其中ActiveMQSession中的代码如下:
  6. // Figure out how long we should wait to resend this message.
  7. long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
  8. for (int i = 0; i < redeliveryCounter; i++) {
  9. // 每次重发拖延时间都是以上一次重发拖延时间来算,所以这里for循环来取得最新的拖延时间
  10. redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
  11. }
  12. // 交给定时任务重发
  13. connection.getScheduler().executeAfterDelay(new Runnable() {
  14. public void run() {
  15. ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
  16. }
  17. }, redeliveryDelay);

ActiveMQMessageConsumer中的代码类似。

c)maximumRedeliveryDelay :重发最大拖延时间,默认为-1,表示没有最大拖延时间,此参数只有当useExponentialBackOff 为true时起效。同样是RedeliveryPolicy中的代码:

  1. protected long maximumRedeliveryDelay = -1;
  2. public long getNextRedeliveryDelay(long previousDelay) {
  3. long nextDelay = redeliveryDelay;
  4. if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
  5. nextDelay = (long) (previousDelay * backOffMultiplier);
  6. if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
  7. // in case the user made max redelivery delay less than redelivery delay for some reason.
  8. nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
  9. }
  10. }
  11. 。。。。。
  12. }

看源代码就显而易见了。
d)initialRedeliveryDelay :第一次重发的拖延时间基础,默认是1000,单位为毫秒,前面讲collisionAvoidanceFactor 属性时已经提到过,这里不再多说。

  1. e)**redeliveryDelay** :如果initialRedeliveryDelay 0,则使用redeliveryDelay ,默认也是1000RedeliveryPolicy中源代码如下:<br /> protected long initialRedeliveryDelay = 1000L;<br /> protected long redeliveryDelay = initialRedeliveryDelay;
  2. f)**useCollisionAvoidance** :消息重发时是否采用前面提到的碰撞避免collisionAvoidanceFactor 参数,默认是false,不采用。源代码上面也给出了,这里不再多说。
  3. g)**useCollisionAvoidance** :是否使用成倍增加拖延,默认为false,如果我们希望重发的拖延时间一次比一次大很多,则可以设置它为true。上面已经给出过源代码,这里再次给出:
  1. protected boolean useExponentialBackOff;
  2. protected double backOffMultiplier = 5.0;
  3. public long getNextRedeliveryDelay(long previousDelay) {
  4. long nextDelay = redeliveryDelay;
  5. if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
  6. nextDelay = (long) (previousDelay * backOffMultiplier);
  7. if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
  8. // in case the user made max redelivery delay less than redelivery delay for some reason.
  9. nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
  10. }
  11. }
  12. if (useCollisionAvoidance) {
  13. /*
  14. * First random determines +/-, second random determines how far to
  15. * go in that direction. -cgs
  16. */
  17. Random random = getRandomNumberGenerator();
  18. double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
  19. nextDelay += nextDelay * variance;
  20. }
  21. return nextDelay;
  22. }

可以看出,成倍拖延是将上一次拖延时间乘以backOffMultiplier来实现的,而 backOffMultiplier默认为5.
h)backOffMultiplier :成倍拖延时间的倍率,默认为5,上面已经提到了,这里不再多说。