消费者与消费组
1.每个消费者都有一个对应的消费组,消息被发布到主题后,只会投递到订阅它的每一个消费组中的一个消费者
如上图所示:
某个主题有4个分区,和两个消费组,A组有四个消费组,b组有2个消费者,,按照kafka的默认规则,最后分配结果是,A组每个消费者分配到一个分区,B组每个消费者分配到两个分区,每个消费者只能消费所分配到的分区中的消息。.换言之,每个分区只能被一个消费组的消费者消费。
2.消费组内的消费者个数变化时所对应的分区分配的演变:
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。
对于消息中间件而言,一般有两种消息投递模式:点对点模式(P2P,Point-to-Point), 发布/订阅模式(Pub/Sub)。
点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。
发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:
消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
1.一个正常的消费逻辑具有以下几个步骤:
①配置客户端配置,创建相应实例
②订阅主题
③拉取消息并消费
④提交消费位移
⑤关闭消费者实例
2.一个简单的java构建消费者实例
3.必要的参数
有4个比填参数
.bootstrap.servers和生产者一样用来指定连接kafka集群所需的broker地址清单
.gruop.id:消费者隶属的消费者名称,默认值为””,如果为空,会报异常,一般而言会设置成具体的业务相关名称
· key.deserializer 和 value.deserializer:与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应
initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaConsumer对应的客户端id,默认值也为“”
订阅主题与分区
使用subscribe()方法订阅主题,一个消费者可以订阅多个主题,如下subscribe()的重载方法已
以集合的方式订阅主题,前后两次订阅不同的主题,以后面的主题为准.
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。
在Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见。正则表达式的方式订阅的示例如下:
消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能,此方法的具体定义如下:
这里只订阅topic-demo主题中分区编号为0的分区,相关代码如下:
如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息,partitionsFor()方法的具体定义如下:获得主题分区的元数据信息
PartitionInfo类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offlineReplicas代表分区的OSR集合。
使用 KafkaConsumer 中的unsubscribe()方法来取消主题的订阅。这个方法既可以取消通过subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过assign(Collection)方式实现的订阅。示例代码如下:
如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,下面三个方法效果相同
当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的
反序列化
同序列化,反序列化也有多种类型的序列化器,
在实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf或Protostuff等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。不推荐自定义序列化器
消息消费
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息
pull()方法里有一个超时参数timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。
消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富,具体的结构参考如下代码:
位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置在每次调用poll()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
消费者在消费完消息之后需要执行消费位移的提交
不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是x+1,对应于图3-6中的position,它表示下一条需要拉取的消息的位置。
KafkaConsumer 类提供了 position(TopicPartition)和committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。这两个方法的定义如下所示。
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。参考图3-7,当前一次poll()操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。
再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。
在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。在代码清单3-1中并没有展示出这两个参数,说明使用的正是默认值。
在Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看一下图3-8中的情形。拉取线程A不断地拉取消息并存入本地缓存,比如在BlockingQueue中,另一个处理线程B从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第y+1次拉取,以及第m次位移提交的时候,也就是x+6之前的位移已经确认提交了,处理线程B却还正在消费x+3的消息。此时如果处理线程B发生了异常,待其恢复之后会从第m此位移提交处,也就是x+6的位置开始拉取消息,那么x+3至x+6之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式,commitSync()方法的定义如下:
可以看到示例中先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。参考 KafkaConsumer 源码中提供的示例,针对上面的示例还可以修改为批量处理+批量提交的方式
与commitSync()方法相反,异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:
第二个方法和第三个方法中的callback参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete()方法。
控制或关闭消费
KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。这两个方法的具体定义如下:
KafkaConsumer还提供了一个无参的paused()方法来返回被暂停的分区集合
指定位移消费
当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。,在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息
如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费,auto.offset.reset参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常
提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:
seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作。ConsumerRebalanceListener 是一个接口,包含2个方法,具体的释义如下:
这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。
这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区
消费者拦截器
费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。
KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点
实现自定义的ConsumerInterceptorTTL之后,需要在KafkaConsumer中配置指定这个拦截器,这个指定的配置和KafkaProducer中的一样,也是通过interceptor.classes参数实现的,此参数的默认值为“”。示例如下:
多线程实现
KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常,acquire()方法和我们通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待,我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。
重要的消费者参数
fetch.min.bytes:该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(b).,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。
fetch.max.bytes:该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是50MB
fetch.max.wait.ms:这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)
max.partition.fetch.bytes:这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。
max.poll.records:这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
connections.max.idle.ms:这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
exclude.internal.topics:Kafka中有两个内部的主题:consumer_offsets和transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。
receive.buffer.bytes:这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。
send.buffer.bytes:这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms:这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms
metadata.max.age.ms:这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。
reconnect.backoff.ms:这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向broker发送的所有请求。
isolation.level;这个参数用来配置消费者的事务隔离级别