kafka 无消息丢失配置

kafka 只对已提交的消息做有限度的持久化保证

kafka什么时候丢失消息

生产者

  1. kafka生产者属于只发送不管结果的,需要配置化
  2. 网络抖动,导致消息压根没有发送到broker端。
  3. 消息本身不合格导致broker拒绝接收(比如消息太大,超过了broker的承受能力)

怎么解决kafka生产者消息丢失?

针对原因,解决方式有

  1. producer不要使用producer.send(msg),使用producer.send(msg,callback) 回调

处理发送失败的责任在 Producer 端而非 Broker 端。除非所有的broker都宕机了。

  1. 针对网络抖动,设置retries重试值。
  2. 重新选分区的时候,设置unclean.leader.election.enable = false, 针对落后太多的broker不能让他竞选为leader。
  3. 设置多分区备份,冗余消息。replication.factor >= 3
  4. 设置了多分区,只有消息写到多少个多分区之后才算生产成功。 设置 min.insync.replicas > 1。
  5. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  6. 消费者消费数据时要先消费数据,再更新位移。(一定要手动提交位移) 顺序不能改变。顺序改变,消息可能丢失。(能确保消息不丢失,可能造成消息重复消费)

kafka 客户端不常见的高级功能

kafka 拦截器

kafka拦截器分为生产者拦截器和消费者拦截器。 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及田炯位移后编写特定逻辑。 都支持链的方式。

生产者拦截器都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口
里边必须有两个方法

  1. onSend 该方法会在消息发送之前被调用
  2. onAcknowledgement 该方法会在消息成功提交或发送失败之后被调用。onAcknowledgement的调用要早于callback的调用。

这两个方法不是在一个线程中调用的,对于调用共享可变对象,一定要保证线程安全。
消费者拦截器都要继承org.apache.kafka.clients.consumer.ConsumerInterceptor接口。里边必须有两个方法。

  1. onCousumer方法。返回给Consumer程序之前调用
  2. onCommit:Consumer在提交位置之后调用该方法。通常在该方法中做一些记账类动作。比如打日志。

指定拦截器类时要制定它们的全限定名。

拦截器应用场景

kafka拦截器可以应用包括客户端监控,端到端系统性能检测、消息审计等多功能在内的场景。
如何监控一条消息从生产到最后消费的端到端延时也是很多kafka用户迫切需要解决的问题。

拦截器将监控逻辑和主业务逻辑解耦 以及 可插拔的机制

Java 生产者是如何管理TCP连接的?

生产者、 消费者、 Broker之间通信都是基于 TCP 的
HTTP 库在很多编程语言中都略显简陋,Kafka 社区决定采用 TCP 协议作为所有请求通信的底层协议

kafka生产者程序步骤

  1. 构建生产者对象所需的参数对象
  2. 利用构建的参数对象,创建 KafkaProducer 对象实例
  3. 使用 KafkaProducer 的 send 方法发送消息。
  4. 调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。

    kafka生产者如何管理tcp 连接管理

    生产者什么时候创建tcp连接?
    Producer 他会链接bootstrap.servers 参数指定的所有broker**

  5. TCP 连接是在创建 KafkaProducer 实例时建立的

  6. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
  7. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。

    kafka何时关闭TCP链接?

  8. 一种是用户主动关闭(kill -9 || producer.close()

  9. 一种是kafka自动关闭(connections.max.idle.ms, connections.max.idle.ms=-1 长链接)

被动关闭链接关闭发起方式客户端,被动关闭会产生大量的CLOSE_WAIT链接

14 幂等生产者和事务生产者是一回事吗?

kafka 对Producer和Consumer要处理的消息有三种模式

  • 最多一次(at most once) 消息可能会丢失,但绝不会被重复发送
  • 至少一次(at least once)消息不会丢失,但有可能被重复发送
  • 精确一次(exactly once)消息不会丢失,也不会被重复发送

目前kafak采用的是可靠性保障的第二种,可能会导致消息重复发送。 其他的看业务场景
kafka如果采用精确一次,通过 幂等性(idempotence)和事务(Transaction)
幂等性优势:我们可以安全的重试任何幂等性操作,反正他们也不会破坏我们的系统状态
kafka 版本 > 0.11.0.0 参数( props.put(“enable.idempotence”,true)
至于消息去重,kafka自动消息去重。 解决方式:经典的空间换时间的优化思路。
只能保证单分区的幂等性。(某个主题某个分区)
如何实现多分区以及多回话的消息的去重,就引入了事务型
设置事务型Producer的方法也很简单

  • 幂等性Producer一样,开启enable.idempotence=true
  • 设置Producer端参数transctional.id

kakfa 事务设置参数有两个设置

read_uncommitted :这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。 read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

**
幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。
关于性能问题 : 幂等性 > 事务性

消费者组到底是什么?

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制
消费者组共享一个公共的id。group id

  1. Consumer Group 下可以有一个或多个 Consumer 实例。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

kafka作为消息引擎模型 : 点对点模型 发布/订阅模型
一个Group下该有多少个Consumer实例呢? 理想情况下,Consumer实例的数量应该等于该Group订阅主题的分区总数

老版本的Consumer Group把位移保存在Zookeeper中(好处:减少Kafka Broker 端的状态保存开销),kafak依赖它实现协调管理。
新版本的Consumer Group 将位移保存在kafka内部主题的方法, 位移保存在Broker端的内部主题中。 __consumer_offsets

Rebalance (重平衡) :Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

那么 Consumer Group 何时进行 Rebalance 呢?

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组
  2. 订阅主题数发生变更。 比如正则方式重新订阅主题
  3. 订阅主题的分区数发生变更。

Rebalance 的劣势: 类似于JVM的垃圾回收的STW。 如果发生Rebalance重新分配分区,所有的Consumer实例都会停止消费。等待Rebalance完成。这个时间可能会很久(达到小时级别)
对于Rebalance造成的问题,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。 避免Rebalance的发生。

揭开神秘的 “位移主题” 面纱

位移主题 : __consumer_offsets
背景&原因: 老版本的消费位移保存在 Apache Zookeeper, 优势:减少了Broker端需要持有的状态空间。从而实现高伸缩性。 **但是Zookeeper不适合高频写操作。
将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。
位移主题就是普通的kafka主题。 可以创建、 修改、删除。位移主题的消息格式是kafka自己定义的。(不能自定义的向主题写入消息,否则可能会造成Broker崩溃)**
偏移量(topic)也是 KV 。 key 和value分别表示消息的键值和消息体。
一个kafka集群中的consumer数量很多,通过key中的group id来表示是那个consumer group。
位移主题的key中应该保存3部分内容< Group ID, 主题名, 分区号>。
位移主题的消息格式还有另外两种格式

  1. 用于保存ConsumerGroup信息的消息。(用来注册 Consumer Group 的)
  2. 用于删除Group过期位移甚至是删除Group的消息。 (tombstone 消息,即墓碑消息,也称 delete mark。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。)

位移,(偏移量)什么时候创建:当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
设置参数

offsets.topic.num.partitions

默认值 50 副本和分区的 配置为 offsets.topic.replication.factor 要做的事情了。它的默认值是 3。
如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3
建议:让kafak自行创建。Consumer 提交位移的两种方式:自动提交位移和手动提交位移。
是否自动定期提交位移 : enable.auto.commit = true 。提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。 保证了消费消息不会丢失。丧失灵活性。
Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据
如果位移主题占用过多磁盘需要看下清理线程是否挂掉。

消费者组重平衡能避免吗?

Rebalance的弊端有哪些?

  1. Rebalance 影响 Consumer 端 TPS。这个之前也反复提到了,这里就不再具体讲了。总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
  2. Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧?他的 Group 下有几百个 Consumer 实例,Rebalance 一次要几个小时。在那种场景下,Consumer Group 的 Rebalance 已经完全失控了。
  3. Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

如何避免Rebalance?
kafka设计层次: Version>0.11.0.0 退出StickAssignor: 粘性的分区分配策略。 尽可能的保留之前的分配方案。尽量实现分区分配的最小变动。

在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的
要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有三个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。
两个必要的参数

  1. Consumer 端有个参数,叫 session.timeout.ms,默认10秒。超时则认为挂掉。即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。
  2. heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance。
  3. 用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数,默认5分钟。 超时rebalance。

目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

如何避免Rebalance 的参数配置。
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的
max.poll.interval.ms参数值尽量设置的大一些。
如果还会出现Rebalance,可以去排查下Consumer端的GC.是否垃圾回收导致时间过长导致停顿,引发Rebalance.

避免Rebalance的四个参数

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC 参数

**

Kafka中位移提交那些事儿

Consumer中的消息位移是消费消息到那个位置。 消息再分区中的位移 这两个位移不一样。
如何开启手动提交位移 ?

Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就可以了。 如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。

自动提交位移能保证不出现消费丢失的情况。 但是可能造成重复消费。
手动提交位移更加灵活,但是是阻塞状态的。知道看到远程的Broker的结果。
为了避免同步阻塞情况,提供的KafkaConsumer#commitAsync()不会阻塞,有回调函数。
但是不能commitAsync 完全替代 commitSync
因为 CommitAsync出现问题不会重试,因为是异步操作,假如加上位移,位移可能已经过期或者不准确。
两者可以结合。实现异步无阻塞式的位移管理。
位移提交大事务切割成小事务。
位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性。总之,Kafka Consumer API 提供了多种灵活的提交方法,方便你根据自己的业务场景定制你的提交策略。

CommitFailedException异常怎么处理?

CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常
防止消费时候抛出异常,简化消息处理逻辑的四种方法。

  1. 缩短单条消息处理的时间。
  2. 增加Consumer端允许下游消费一批消息的最大时长。
  3. 减少下游系统一次性消费的消息总数。
  4. 下游系统使用多线程来加锁消费。

可以设置 max.poll.records 值为 150,甚至更少,这样每批消息的总消费时长不会超过 300 秒(150*2=300),即 max.poll.interval.ms 的默认值 5 分钟。这种减少 max.poll.records 值的做法就属于上面提到的方法 3。

在 Consumer 端应用程序的某个地方,能够以日志或其他方式友善地提示你错误的原因,这样你才能正确处理甚至是预防该异常。
**

多线程开发消费者实例

Kafka Java Consumer 就是单线程的设计。
从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)

不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。
老版本Consumer是多线程阻塞式, 很多流处理应用的场景比如 :执行过滤、 连接、 分组等操作就不能是阻塞式的。 所以新版本的Consumer设计了单线程+轮询的机制。
单线程的设计能够简化Consumer端的设计。Consumer获取消息,处理消息逻辑是否采用多线程,完全由你决定。
单线程设计语言移植性好。

多线程方案

需要确定的是KafkaConsumer类不是线程安全的,为了确保线程安全,你不能再多个线程中共享同一个KafkaConsumer类的实例,否则会抛出ConcurrentModificationException异常。

  1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程

image.png

  1. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑

image.png

image.png

21 Java 消费者是如何管理TCP连接的?

Kafka的网络是基于TCP协议的,而不是基于UDP协议。
和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的。

Socket不是在构造函数中创建的,也不是KafkaConsumer.subscribe或KafkaConsumer.assign方法中创建
TCP连接是在调用KakfaConsumer.poll方法时创建的。 poll方法内部有3个时机可以创建TCP连接 **

  1. 发起FindCoordinator请求时。
  2. 连接协调者时。
  3. 消费数据时。

    创建多少个TCP连接?

    消费者对连接的kafka集群一无所知,所以连接的Broker节点的ID是-1。表示消费者根本不知道要连接的Kafak Broker的任何信息。
    消费者程序会创建3类TCP连接:

  4. 确定协调者和获取集群元数据

  5. 连接协调者,灵气执行组成员管理操作
  6. 执行实际的消息获取。

    何时关闭TCP连接?

    消费者关闭Socket也分为主动关闭和Kafka自动关闭。
    主动关闭: 主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令
    自动关闭: 是由消费者端参数 connection.max.idle.ms控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。
    当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接
    **

    消费者组消费进度监控都怎么实现?

    kafka消费者: 最重要的就是监控它们的消费进度了,或者说监控它们消费的滞后程度。 名称:消费者Lag或Consumer Lag
    滞后程度:就是消费者当前落后于生产者的程度。Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。
    Lag的单位是消息数,Kafka监控Lag的层级是在分区上。Lag直接反映了一个消费者的运行情况。
    由于消费者的速度无法匹急生产者速度。可能导致它消费的数据不再操作系统的页缓存中。会失去Zero Copy技术资格。
    需要时刻关注消费者的消费进度。怎么监控它呢?

  7. 使用Kafka自带的命令行工具kafka-consumer-groups脚本。

  8. 使用Kafka Java Consumer API 编程。
  9. 使用Kafka自带的JMX监控指标。

kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。也可以监控独立消费者的Lag,独立消费者就是没有使用消费者组机制的消费者程序

bin/kafka-consumer-groups.sh —bootstrap-server —describe —group

Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值
Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理
分区级别的 JMX 指标中多了 records-lag-avg 和 records-lead-avg 两个属性,可以计算平均的 Lag 值和 Lead 值。
方法 1 是最简单的,我们直接运行 Kafka 自带的命令行工具即可。方法 2 使用 Consumer API 组合计算 Lag,也是一种有效的方法,重要的是它能集成进很多企业级的自动化监控工具中。不过,集成性最好的还是方法 3,直接将 JMX 监控指标配置到主流的监控框架就可以了。 推荐方法三

Kafka副本机制详解

副本机制(Replication)也称为备份机制,通常是指分布式系统再多台网络互联的机器上保存有相同的数据拷贝。
副本机制有什么好处?

  1. 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
  2. 提供高伸缩性,支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
  3. 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

Apache Kafka 只提供了高可用和高持久性。

Kafka 是有主题概念的,而每个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。

配置多个副本,
如何确保副本的数据一致?
消息如何同步到所有的副本中呢? 最常见的解决方案:**基于领导者(Leader-based)的副本机制
image.png
副本分为:领导者副本、 追随者副本
追随者副本不对外提供服务。 领导者负责请求处理,副本从领导者副本异步拉取消息,并写入自己的提交日志,实现从领导者副本同步。
Kafak依托于Zookeeper监控,老Leader crash掉后实行领导者选举, 老Leader副本重启回归,只能作为追随者加入集群。
追随者副本是不对外提供服务的
Kafka 的副本不能提供像mysql那样抗读服务,为什么这么设计呢?

  1. 方便实现 Read-your-writers, 你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景
  2. 方便实现单调读, 追随者副本 F1读到数据,追随者副本F2没有读到。造成两次读取分布式结果不一样。


In-sync Replicas(ISR)

追随者副本什么时候与Leader同步。

Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本
ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的
Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是另有“玄机”。
这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。
ISR 是一个动态调整的集合,而非静态不变的。

Unclean 领导者选举(Unclean Leader Election)

ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本
Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

分布式CAP

一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

Unclean 强烈不建议开启


24 请求是怎么被处理的?

kaka 定义了自己的请求协议,用于实现各种各样的交互操作。
所有的请求都是通过TCP网络以Socket的方式进行通讯的。
kafka如何处理请求?

  1. 顺序处理请求。 缺陷:吞吐量太差,只能顺序处理请求, 这种方式只使用于 请求发送非常不频繁的系统
  2. 每个请求使用单独线程处理。 完全采用异步的方式。 为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

这两种方案都不好,kafka是如何处理请求的呢? -> Kafka 使用的是Reactor模式。
Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景
image.png
image.png
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数num.io.threads控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。
比如,如果你的机器上 CPU 资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端
Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求
Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。**控制类请求有这样一种能力:它可以直接令数据类请求失效!

25 消费者组重平衡全流程解析

消费者组的重平衡流程: 作用是让组内的所有消费者实例就消费哪些主题分区达成一致。
重平衡过程是如何通知到其他消费者实例的?答案就是,靠消费者端的心跳线程(Heartbeat Thread)
重平衡的通知机制正式通过心跳线程来完成的。

消费者组状态机

kafka为消费者组定义了五中状态,分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,这 5 种状态的含义是什么呢?
image.png
image.png
image.png只有 Empty 状态下的组,才会执行过期位移删除的操作。

消费者端重平衡流程

在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求
领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

image.png
JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。

SyncGroup 请求的处理流程。
image.png
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker 端重平衡场景剖析

场景一:新成员入组。
现在,我用一张时序图来说明协调者一端是如何处理新成员入组的。
image.png
场景二:组成员主动离组。
LeaveGroup 请求:
image.png
场景三:组成员崩溃离组。
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组
崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。
image.png

场景四:重平衡时协调者对组内成员提交位移的处理。
**
image.png

你一定不能错过的kafka控制器

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群

控制器是重度依赖 ZooKeeper 的。Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以“/”开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。
以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode
ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。
image.png

控制器是如何被选出来的?

Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器

控制器是做什么的?

控制器是起协调作用的组件, 主要职责为以下五种

  1. 主题管理(创建、删除、增加分区)
  2. 分区重分配
  3. .Preferred 领导者选举
  4. 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
  5. 数据服务

    控制器保存了什么数据?

    image.png
  • 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

    控制器故障转移(Failover)

    在 Kafka 集群运行过程中,只能有一台 Broker 充当控制器的角色,那么这就存在单点失效(Single Point of Failure)的风险,Kafka 是如何应对单点失效的呢?
    答案是为控制器提供故障转移功能,也就是说所谓的Failover。
    故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。
    控制器故障转移过程:
    image.png

控制器内部设计原理

控制器是多线程的设计,会在内部创建很多个线程。
老版本的控制器通过ReentrantLock 同步机制,进一步拖慢了控制器的处理速度。
社区版本 0.11 重构控制器底层设计。** 1. **把多线程的方案改成了单线程加事件队列的方案
image.png
2.将之前同步操作 ZooKeeper 全部改为异步操作, **当有大量主题分区发生变更时,ZooKeeper 容易成为系统的瓶颈
3.赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。删除某个主题优先级更高。(>2.2)
当你觉得控制器组件出现问题时,比如
主题无法删除了,或者重分区 hang 住了,你不用重启 Kafka Broker 或控制器。有一个简单快速的方式是,去 ZooKeeper 中手动删除 /controller 节点。具体命令是 rmr /controller**。这样做的好处是,既可以引发控制器的重选举,又可以避免重启 Broker 导致的消息处理中断。

27.关于高水位和Leader Epoch的讨论

什么是高水位?

水位一词多用于流式处理领域。
定义 : 在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
kafka中的高水位:Kafka 的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表征的。

高水位的作用?

kafka中高水位的作用有2个

  1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  2. 帮助 Kafka 完成副本同步。

位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。
日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。
介于高水位和 LEO 之间的消息就属于未提交消息。
同一个副本对象,其高水位值不会大于 LEO 值
高水位和 LEO 是副本对象的两个重要属性。Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位

高水位更新机制

为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位
Kafka 副本机制的原理,以及它是如何使用高水位来执行副本消息同步的。
image.png
与 Leader 副本保持同步。判断的条件有两个。

  1. 该远程 Follower 副本在 ISR 中。
  2. 该远程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的时间,不超过 Broker 端参数 replica.lag.time.max.ms 的值。如果使用默认值的话,就是不超过 10 秒。

分别从 Leader 副本和 Follower 副本两个维度,来总结一下高水位和 LEO 的更新机制。
Leader 副本
处理生产者请求的逻辑如下:

  1. 写入消息到本地磁盘。
  2. 更新分区高水位值。
    i. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值{LEO-1,LEO-2,……,LEO-n}。
    ii. 获取 Leader 副本高水位值:currentHW。
    iii. 更新 currentHW = min(currentHW, LEO-1,LEO-2,……,LEO-n)。

Follower 副本
从 Leader 拉取消息的处理逻辑如下:

  1. 写入消息到本地磁盘。
  2. 更新 LEO 值。
  3. 更新高水位值。
    i. 获取 Leader 发送的高水位值:currentHW。
    ii. 获取步骤 2 中更新过的 LEO 值:currentLEO。
    iii. 更新高水位为 min(currentHW, currentLEO)。

    副本同步机制解析

    Leader Epoch 登场

    kafka展到多个 Follower 副本,需要多轮拉取请求。
    导致问题:Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。
    0.11版本引入Leader Epoch概念。
    **
    所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  4. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。

  5. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

高水位消息丢失设置的参数是Broker 端参数 min.insync.replicas 设置为 1。

主题管理知多少?

主题日常管理

Kafka 提供了自带的 kafka-topics 脚本,用于帮助用户创建主题

bin/kafka-topics.sh —bootstrap-server broker_host:port —create —topic my_topic_name —partitions 1 —replication-factor 1

使用 2.2 版本了,那么创建主题请指定 —bootstrap-server 参数。
原因有两个

  1. 使用 —zookeeper 会绕过 Kafka 的安全体系。这就是说,即使你为 Kafka 集群设置了安全认证,限制了主题的创建,如果你使用 —zookeeper 的命令,依然能成功创建任意主题,不受认证体系的约束。这显然是 Kafka 集群的运维人员不希望看到的。
  2. 使用 —bootstrap-server 与集群进行交互,越来越成为使用 Kafka 的标准姿势。换句话说,以后会有越来越少的命令和 API 需要与 ZooKeeper 进行连接。这样,我们只需要一套连接信息,就能与 Kafka 进行全方位的交互,不用像以前一样,必须同时维护 ZooKeeper 和 Broker 的连接信息。

查询所有主题的列表。

bin/kafka-topics.sh —bootstrap-server broker_host:port —list

要查询单个主题的详细数据,你可以使用下面的命令。

bin/kafka-topics.sh —bootstrap-server broker_host:port —describe —topic

如果 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回所有“可见”主题的详细数据给你。
这里的“可见”,是指发起这个命令的用户能够看到的 Kafka 主题

1. 修改主题分区。
其实就是增加分区,目前 Kafka 不允许减少某个主题的分区数。你可以使用 kafka-topics 脚本,结合 —alter 参数来增加某个主题的分区数,命令如下:

bin/kafka-topics.sh —bootstrap-server broker_host:port —alter —topic —partitions < 新分区数 >

这里要注意的是,你指定的分区数一定要比原有分区数大,否则 Kafka 会抛出 InvalidPartitionsException 异常。
2. 修改主题级别参数

bin/kafka-configs.sh —zookeeper zookeeper_host:port —entity-type topics —entity-name —alter —add-config max.message.bytes=10485760

这个脚本就要指定 —zookeeper,而不是 —bootstrap-server. 为什么?
这个脚本也能指定 —bootstrap-server 参数,只是它是用来设置动态参数的。只需要了解设置常规的主题级别参数,还是使用 —zookeeper。

4. 修改主题限速。
想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka 提供了这样的功能。
假设我有个主题,名为 test,我想让该主题各个分区的 Leader 副本和 Follower 副本在处理副本同步时,不得占用超过 100MBps 的带宽。注意是大写 B,即每秒不超过 100MB。那么,我们应该怎么设置呢?

bin/kafka-configs.sh —zookeeper zookeeper_host:port —alter —add-config ‘leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600’ —entity-type brokers —entity-name 0

命令结尾处的 —entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。

设置好这个参数之后,我们还需要为该主题设置要限速的副本。在这个例子中,我们想要为所有副本都设置限速,因此统一使用通配符 * 来表示,

bin/kafka-configs.sh —zookeeper zookeeper_host:port —alter —add-config ‘leader.replication.throttled.replicas=,follower.replication.throttled.replicas=‘ —entity-type topics —entity-name test

5. 主题删除

bin/kafka-topics.sh —bootstrap-server broker_host:port —delete —topic

删除主题的命令并不复杂,关键是删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka 会在后台默默地开启主题删除操作。因此,通常情况下,你都需要耐心地等待一段时间。

特殊主题管理与运维

Kafka 内部主题 consumer_offsets 和 transaction_state。这两个内部主题默认都有 50 个分区,因此,分区子目录会非常得多。
建议是不要手动创建或修改它们,还是让 Kafka 自动帮我们创建
特别是对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。

bin/kafka-console-consumer.sh —bootstrap-server kafka_host:port —topic __consumer_offsets —formatter “kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter” —from-beginning

除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。

bin/kafka-console-consumer.sh —bootstrap-server kafka_host:port —topic __consumer_offsets —formatter “kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter” —from-beginning

常见主题错误处理

常见错误 1:主题删除失败。
原因: 实际上,造成主题删除失败的原因有很多,最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
解决方式: 如果是因为前者,通常你重启对应的 Broker 之后,删除操作就能自动恢复;如果是因为后者,那就麻烦了,很可能两个操作会相互干扰。
不管什么原因,一旦你碰到主题无法删除的问题,可以采用这样的方法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。
常见错误 2:__consumer_offsets 占用太多的磁盘。
一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。
原因:这都是因为该线程挂掉了,无法及时清理此内部主题。
倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。

29 | Kafka动态配置了解下? 动态参数配置

在 Kafka 安装目录的 config 路径下,有个 server.properties 文件。通常情况下,我们会指定这个文件的路径来启动 Broker。如果要设置 Broker 端的任何参数,我们必须在这个文件中显式地增加一行对应的配置,之后启动 Broker 进程,令参数生效。
之前修改参数需要重启Broker。
社区1.1.0版本中引入动态Broker参数, 所谓动态,就是指修改参数值后,无需重启 Broker 就能立即生效,而之前在 server.properties 中配置的参数则称为静态参数(Static Configs)。
2.3版本broker参数200多个部分升级成动态参数

  • read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
  • per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
  • cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。

log.retention.ms 参数是 cluster-wide 级别的,Kafka 允许为集群内所有 Broker 统一设置一个日志留存时间值。

使用场景

动态 Broker 参数的使用场景都有哪些呢?

  • 动态调整 Broker 端各种线程池大小,实时应对突发流量。
  • 动态调整 Broker 端连接信息或安全配置信息。
  • 动态更新 SSL Keystore 有效期。
  • 动态调整 Broker 端 Compact 操作性能。
  • 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。

对于动态参数你甚至可以将这套调整线程数的动作,封装进定时任务中,以实现自动扩缩容。

如何保存

image.png
changes 是用来实时监测动态参数变更的,不会保存参数值;topics 是用来保存 Kafka 主题级别参数的。虽然它们不属于动态 Broker 端参数,但其实它们也是能够动态变更的。
users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
/config/brokers znode 才是真正保存动态 Broker 参数的地方。
**cluster-wide、per-broker 和 static 参数的优先级是这样的:per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值。

如何配置

如何在集群层面设置全局值,即设置 cluster-wide 范围值。

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —add-config unclean.leader.election.enable=true Completed updating default config for brokers in the cluster,

如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —describe Default config for brokers in the cluster are: unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}

注意 sensitive=false 的字眼,它表明我们要调整的参数不是敏感数据。如果我们调整的是类似于密码这样的参数时,该字段就会为 true,表示这属于敏感数据。
如何设置 per-broker 范围参数。

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —alter —add-config unclean.leader.election.enable=false Completed updating config for broker: 1.

查看配置是否生效

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —describe Configs for broker 1 are: unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}

这条命令的输出信息很多。我们关注两点即可。

  1. 在 Broker 1 层面上,该参数被设置成了 false,这表明命令运行成功了。
  2. 从倒数第二行可以看出,在全局层面上,该参数值依然是 true。这表明,我们之前设置的 cluster-wide 范围参数值依然有效。

如果我们要删除 cluster-wide 范围参数或 per-broker 范围参数,也非常简单,分别执行下面的命令就可以了。

删除 cluster-wide 范围参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —delete-config unclean.leader.election.enable Completed updating default config for brokers in the cluster,


删除 per-broker 范围参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —alter —delete-config unclean.leader.election.enable Completed updating config for broker: 1.

删除动态参数要指定 delete-config。的结果是

查看 cluster-wide 范围参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —describe Default config for brokers in the cluster are:


查看 Broker 1 上的动态参数配置

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —describe Configs for broker 1 are:

如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。我们可以先来看看下面这两张图。
image.png
image.png
以上两张图的动态Broker参数不需要都调整, 一下为几个最常用的动态调整参数。
1.log.retention.ms。修改日志留存时间
2.num.io.threads 和 num.network.threads。生产环境中,Broker 端请求处理能力经常要按需扩容。
3. 与 SSL 相关的参数。
主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和 ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL 证书。
4.num.replica.fetchers。最实用的动态 Broker 参数之一。Follower 副本拉取速度慢,在线上 Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。

30. 怎么重设消费者组位移?

什么要重设消费者组位移?

Kafka的消费者读取消息是可以重演的(replayable) 所谓的重复消费
像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。
Kafka,它是基于日志结构(log-based)消息引擎,消费消息在消费消息时,仅仅是从磁盘文件上读取数据而已。是只读操作。因此消费者不会删除消息数据。消费者位移由消费者控制的,方便修改位移值。实现重复消费历史数据的功能。

重设位移策略

重设位移大致从两个维度来进行。

  1. 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
  2. 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。

7种重设策略
image.png

  1. Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略
  2. Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。
  3. Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。
  4. 表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。
  5. 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。
  6. DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。
  7. Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

目前,重设消费者组位移的方式有两种。

  • 通过消费者 API 来实现。
  • 通过 kafka-consumer-groups 命令行脚本来实现。

    消费者 API 方式设置

    我主要以 Java API 为例进行演示。如果你使用的是其他语言,方法应该是类似的,不过你要参考具体的 API 文档。
    总之,使用 Java API 的方式来实现重设策略的主要入口方法,就是 seek 方法

    命令行方式设置

    通过kafka-consumer-group 脚本设置位移。 不过引入是在kafka0.11版本引入的 ,需要大于0.11

1.Earliest 策略直接指定–to-earliest。

bin/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —all-topics —to-earliest –execute

2.Latest 策略直接指定–to-latest。

bin/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —all-topics —to-latest —execute

3.Current 策略直接指定–to-current。

bin/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —all-topics —to-current —execute

4.Specified-Offset 策略直接指定–to-offset。

bin/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —all-topics —to-offset —execute

5.Shift-By-N 策略直接指定–shift-by N。

bin/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —shift-by —execute

6.DateTime 策略直接指定–to-datetime。

in/kafka-consumer-groups.sh —bootstrap-server kafka-host:port —group test-group —reset-offsets —to-datetime 2019-06-20T20:00:00.000 —execute

7.最后是实现 Duration 策略,我们直接指定–by-duration。

kafka-host:port —group test-group —reset-offsets —by-duration PT0H30M0S —execute

31 | 常见工具脚本大汇总

命令行脚本概览

以 2.2 版本为例,详细地盘点下这些命令行工具。下图展示了 2.2 版本提供的所有命令行脚本。
image.png
2.2 版本总共提供了 30 个 SHELL 脚本。图中的 windows 实际上是个子目录,里面保存了 Windows 平台下的 BAT 批处理文件。其他的.sh 文件则是 Linux 平台下的标准 SHELL 脚本。
默认情况下,不加任何参数或携带 —help 运行 SHELL 文件,会得到该脚本的使用方法说明。下面这张图片展示了 kafka-log-dirs 脚本的调用方法。
image.png
connect-standalone 和 connect-distributed 两个脚本。这两个脚本是 Kafka Connect 组件的启动脚本。 Kafka Connect 组件,用于实现 Kafka 与外部世界系统之间的数据传输。Kafka Connect 支持单节点的 Standalone 模式,也支持多节点的 Distributed 模式。这两个脚本分别是这两种模式下的启动脚本。
kafka-broker-api-versions 脚本。这个脚本的主要目的是验证不同 Kafka 版本之间服务器和客户端的适配性
在 0.10.2.0 之前,Kafka 是单向兼容的,即高版本的 Broker 能够处理低版本 Client 发送的请求,反过来则不行。自 0.10.2.0 版本开始,Kafka 正式支持双向兼容,也就是说,低版本的 Broker 也能处理高版本 Client 的请求了
kafka-producer-perf-test 和 kafka-consumer-perf-test。它们分别是生产者和消费者的性能测试工具。
kafka-delete-records 脚本用于删除 Kafka 的分区消息。鉴于 Kafka 本身有自己的自动消息删除策略,这个脚本的实际出场率并不高。
kafka查看消费消息内容命令:
kafka-dump-log 脚本可谓是非常实用的脚本。它能查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身。
kafka-log-dirs 脚本是比较新的脚本,可以帮助查询各个 Broker 上的各个日志路径的磁盘占用情况。
kafka-mirror-maker 脚本是帮助你实现 Kafka 集群间的消息同步的。
kafka-preferred-replica-election 脚本是执行 Preferred Leader 选举的。它可以为指定的主题执行“换 Leader”的操作。
kafka-reassign-partitions 脚本用于执行分区副本迁移以及副本文件路径迁移。
对于 kafka-server-start 和 kafka-server-stop 脚本,你应该不会感到陌生,它们是用于启动和停止 Kafka Broker 进程的。

重点脚本操作

生产消息
生产消息使用 kafka-console-producer 脚本即可

bin/kafka-console-producer.sh —broker-list kafka-host:port —topic test-topic —request-required-acks -1 —producer-property compression.type=lz4 >

指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法。这个脚本可以很方便地让我们使用控制台来向 Kafka 的指定主题发送消息。
消费消息

$ bin/kafka-console-consumer.sh —bootstrap-server kafka-host:port —topic test-topic —group test-group —from-beginning —consumer-property enable.auto.commit=false

指定了 group 信息。如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。久而久之,你会发现你的集群中有大量的以 console-consumer 开头的消费者组。通常情况下,你最好还是加上 group。
from-beginning 等同于将 Consumer 端参数 auto.offset.reset 设置成 earliest,表明我想从头开始消费主题。如果不指定的话,它会默认从最新位移读取消息。如果此时没有任何新消息,那么该命令的输出为空,你什么都看不到。
测试生产者性能
测试生产者的脚本:kafka-producer-perf-test

$ bin/kafka-producer-perf-test.sh —topic test-topic —num-records 10000000 —throughput -1 —record-size 1024 —producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4 2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

上述命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB。该命令允许你在 producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等。
会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布。或者说,我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到99th 分位就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

测试消费者性能**

$ bin/kafka-consumer-perf-test.sh —broker-list kafka-host:port —messages 10000000 —topic test-topic start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

该脚本也会打印出消费者的吞吐量数据。

查看主题消息总数
想查看某个主题当前消息总数。

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port —time -2 —topic test-topic
test-topic:0:0 test-topic:1:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port —time -1 —topic test-topic test-topic:0:5500000 test-topic:1:5500000

用 Kafka 提供的工具类GetOffsetShell来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数。对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条。

查看消息文件
使用 kafka-dump-log 脚本来查看具体的内容

$ bin/kafka-dump-log.sh —files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true ……

只是指定 —files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。
如果我们想深入看一下每条具体的消息,那么就需要显式指定 —deep-iteration 参数,如下所示:

$ bin/kafka-dump-log.sh —files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log —deep-iteration —print-data-log

查看消费者组位移
用的是 —reset-offsets 参数,今天我们使用的是 —describe 参数。假设我们要查询 Group ID 是 test-group 的消费者的位移,那么命令如图所示:
image.png
图中的 CURRENT-OFFSET 表示该消费者当前消费的最新位移,LOG-END-OFFSET 表示对应分区最新生产消息的位移,LAG 列是两者的差值。CONSUMER-ID 是 Kafka 消费者程序自动生成的一个 ID。截止到 2.2 版本,你都无法干预这个 ID 的生成过程。如果运行该命令时,这个消费者程序已经终止了,那么此列的值为空。

32 | KafkaAdminClient:Kafka的运维利器

引入原因

令行脚本很多都是通过连接 ZooKeeper 来提供服务的。目前,社区已经越来越不推荐任何工具直连 ZooKeeper 了,因为这会带来一些潜在的问题,比如这可能会绕过 Kafka 的安全设置。
推荐原因: 客户端的AdminClient。

如何使用?

java 客户端提供

功能

  1. 主题管理:包括主题的创建、删除和查询。
  2. 权限管理:包括具体权限的配置与删除。
  3. 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有 Broker、主题、用户、Client-id 等。
  4. 副本日志管理:包括副本底层日志路径的变更和详情查询。
  5. 分区管理:即创建额外的主题分区。
  6. 消息删除:即删除指定位移之前的分区消息。
  7. Delegation Token 管理:包括 Delegation Token 的创建、更新、过期和详情查询。
  8. 消费者组管理:包括消费者组的查询、位移查询和删除。
  9. Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

工作原理

从设计上来看,AdminClient 是一个双线程的设计:前端主线程和后端 I/O 线程
前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中;而后端 I/O 线程从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程的获取。
image.png
端主线程会创建名为 Call 的请求对象实例。该实例有两个主要的任务。

  1. 构建对应的请求对象
  2. 指定响应的回调逻辑

AdminClient 是使用 Java Object 对象的 wait 和 notify 实现的这种通知机制。