kafka.png

文章一

本文由 简悦 SimpRead 转码, 原文地址 www.cnblogs.com

数据可靠性

Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从 Producter 往 Broker 发送消息、Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性。

Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为 3。

Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Producer 往 Broker 发送消息

如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:

  • acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。
  • acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader 发生崩溃。
  • acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。

根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。

另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

Leader 选举

在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见 《一文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

  • producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
  • topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
  • broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

Kafka 是如何保证数据可靠性和一致性 - 图2

假设分区的副本为 3,其中副本 0 是 Leader,副本 1 和副本 2 是 follower,并且在 ISR 列表里面。虽然副本 0 已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本 2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是 “不安全” 的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本 0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本 1 为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
**


文章二

本文由 简悦 SimpRead 转码, 原文地址 liyangyang.blog.csdn.net

在 kafka 中主要通过 ISR 机制来保证消息的可靠性。 下面通过几个问题来说明 kafka 如何来保证消息可靠性与一致性

在 kafka 中 ISR 是什么?

在 zk 中会保存 AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR

  • ISR(in sync replica):是 kafka 动态维护的一组同步副本,在 ISR 中有成员存活时,只有这个组的成员才可以成为 leader,内部保存的为每次提交信息时必须同步的副本(acks = all 时),每当 leader 挂掉时,在 ISR 集合中选举出一个 follower 作为 leader 提供服务,当 ISR 中的副本被认为坏掉的时候,会被踢出 ISR,当重新跟上 leader 的消息数据时,重新进入 ISR。
  • OSR(out sync replica): 保存的副本不必保证必须同步完成才进行确认,OSR 内的副本是否同步了 leader 的数据,不影响数据的提交,OSR 内的 follower 尽力的去同步 leader,可能数据版本会落后。

kafka 如何控制需要同步多少副本才可以返回确定到生产者消息才可用?

  • 当写入到 kakfa 时,生产者可以选择是否等待 0(只需写入 leader),1(只需同步一个副本) 或 -1(全部副本)的消息确认(这里的副本指的是 ISR 中的副本)。
  • 需要注意的是 “所有副本确认” 并不能保证全部分配副本已收到消息。默认情况下,当 acks=all 时,只要当前所有在同步中的副本(ISR 中的副本)收到消息,就会进行确认。所以 Kafka 的交付承诺可以这样理解:对没有提交成功的消息不做任何交付保证,而对于 ISR 中至少有一个存活的完全同步的副本的情况下的 “成功提交” 的消息保证不会丢失。

对于 kafka 节点活着的条件是什么?

  • 第一点:一个节点必须维持和 zk 的会话,通过 zk 的心跳检测实现
  • 第二点:如果节点是一个 slave 也就是复制节点,那么他必须复制 leader 节点不能太落后。这里的落后可以指两种情况
    • 1:数据复制落后,slave 节点和 leader 节点的数据相差较大,这种情况有一个缺点,在生产者突然发送大量消息导致网络堵塞后,大量的 slav 复制受阻,导致数据复制落后被大量的踢出 ISR。
    • 2:时间相差过大,指的是 slave 向 leader 请求复制的时间距离上次请求相隔时间过大。通过配置replica.lag.time.max就可以配置这个时间参数。这种方式解决了上述第一种方式导致的问题。

kafka 分区 partition 挂掉之后如何恢复?

在 kafka 中有一个 partition recovery 机制用于恢复挂掉的 partition。

每个 Partition 会在磁盘记录一个 RecoveryPoint(恢复点), 记录已经 flush 到磁盘的最大 offset。当 broker fail 重启时, 会进行 loadLogs。 首先会读取该 Partition 的 RecoveryPoint, 找到包含 RecoveryPoint 点上的 segment 及以后的 segment, 这些 segment 就是可能没有完全 flush 到磁盘 segments。然后调用 segment 的 recover, 重新读取各个 segment 的 msg, 并重建索引。

优点:

  1. 以 segment 为单位管理 Partition 数据, 方便数据生命周期的管理, 删除过期数据简单
  2. 在程序崩溃重启时, 加快 recovery 速度, 只需恢复未完全 flush 到磁盘的 segment 即可

什么原因导致副本与 leader 不同步的呢?

  • 慢副本:在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是 I / O 瓶颈导致 follower 追加复制消息速度慢于从 leader 拉取速度。
  • 卡住副本:在一定周期时间内 follower 停止从 leader 拉取请求。follower replica 卡住了是由于 GC 暂停或 follower 失效或死亡。
  • 新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。

一个 partition 的 follower 落后于 leader 足够多时,被认为不在同步副本列表或处于滞后状态。

正如上述所说,现在 kafka 判定落后有两种,副本滞后判断依据是副本落后于 leader 最大消息数量 (replica.lag.max.messages) 或 replicas 响应 partition leader 的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本, 而后者是用来检测失效或死亡的副本

如果 ISR 内的副本挂掉怎么办?

  • 两种选择:服务直接不可用一段时间等待 ISR 中副本恢复(祈祷恢复的副本有数据吧) 或者 直接选用第一个副本(这个副本不一定在 ISR 中)作为 leader,这两种方法也是在可用性和一致性之间的权衡。
  • 服务不可用方式这种适用在不允许消息丢失的情况下使用,适用于一致性大于可用性,可以有两种做法
    • 设置 ISR 最小同步副本数量,如果 ISR 的当前数量大于设置的最小同步值,那么该分区才会接受写入,避免了 ISR 同步副本过少。如果小于最小值那么该分区将不接收写入。这个最小值设置只有在 acks = all 的时候才会生效。
    • 禁用 unclean-leader 选举,当 isr 中的所有副本全部不可用时,不可以使用 OSR 中的副本作为 leader,直接使服务不可用,直到等到 ISR 中副本恢复再进行选举 leader。
  • 直接选择第一个副本作为 leader 的方式,适用于可用性大于一致性的场景,这也是 kafka 在 isr 中所有副本都死亡了的情况采用的默认处理方式,我们可以通过配置参数unclean.leader.election.enable来禁止这种行为,采用第一种方法。

那么 ISR 是如何实现同步的呢?

broker 的 offset 大致分为三种:base offset、high watemark(HW)、log end offset(LEO)

  • base offset:起始位移,replica 中第一天消息的 offset
  • HW:replica 高水印值,副本中最新一条已提交消息的位移。leader 的 HW 值也就是实际已提交消息的范围,每个 replica 都有 HW 值,但仅仅 leader 中的 HW 才能作为标示信息。什么意思呢,就是说当按照参数标准成功完成消息备份(成功同步给 follower replica 后)才会更新 HW 的值,代表消息理论上已经不会丢失,可以认为 “已提交”。
  • LEO:日志末端位移,也就是 replica 中下一条待写入消息的 offset,注意哈,是下一条并且是待写入的,并不是最后一条。这个 LEO 个人感觉也就是用来标示 follower 的同步进度的。
    所以 HW 代表已经完成同步的数据的位置,LEO 代表已经写入的最新位置,只有 HW 位置之前的才是可以被外界访问的数据。
    现在就来看一下之前,broker 从收到消息到返回响应这个黑盒子里发生了什么。
    Kafka 是如何保证数据可靠性和一致性 - 图3
  1. broker 收到 producer 的请求
  2. leader 收到消息,并成功写入,LEO 值 + 1
  3. broker 将消息推给 follower replica,follower 成功写入 LEO +1
  4. 所有 LEO 写入后,leader HW +1
  5. 消息可被消费,并成功响应

上述过程从下面的图便可以看出:
Kafka 是如何保证数据可靠性和一致性 - 图4

解决上一个问题后,接下来就是 kafka 如何选用 leader 呢?

选举 leader 常用的方法是多数选举法,比如 Redis 等,但是 kafka 没有选用多数选举法,kafka 采用的是 quorum(法定人数)。

quorum 是一种在分布式系统中常用的算法,主要用来通过数据冗余来保证数据一致性的投票算法。在 kafka 中该算法的实现就是 ISR,在 ISR 中就是可以被选举为 leader 的法定人数。

  • 在 leader 宕机后,只能从 ISR 列表中选取新的 leader,无论 ISR 中哪个副本被选为新的 leader,它都知道 HW 之前的数据,可以保证在切换了 leader 后,消费者可以继续看到 HW 之前已经提交的数据。
  • HW 的截断机制:选出了新的 leader,而新的 leader 并不能保证已经完全同步了之前 leader 的所有数据,只能保证 HW 之前的数据是同步过的,此时所有的 follower 都要将数据截断到 HW 的位置,再和新的 leader 同步数据,来保证数据一致。
    当宕机的 leader 恢复,发现新的 leader 中的数据和自己持有的数据不一致,此时宕机的 leader 会将自己的数据截断到宕机之前的 hw 位置,然后同步新 leader 的数据。宕机的 leader 活过来也像 follower 一样同步数据,来保证数据的一致性。