1 概述

1.1 定义

Kafka 是一个分布式的基于发布/订阅模式消息队列(Message Queue),主要应用于大数据实时处理领域。

中文链接:Kafka中文文档链接

1.2消息队列

1.2.1传统消息队列的应用场景

  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/12908052/1619259330678-f012a3d5-b12f-4dcc-ada0-4ff3c62d7846.png#align=left&display=inline&height=296&margin=%5Bobject%20Object%5D&name=image.png&originHeight=432&originWidth=865&size=84283&status=done&style=shadow&width=592)<br />**使用消息队列的好处:**
  1. 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  2. 可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  3. 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  4. 灵活性 & 峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  5. 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.2.2 消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
image.png

(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
消息推送有两种情况:

  • Topic推送(缺点:很难适应消费速率不同的消费者)
  • 消费者主动拉取(缺点:导致长轮询)

    image.png

1.3KafKa基础架构

image.png

  • Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  • Consumer :消息消费者,向 kafka broker 取消息的客户端;
  • Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
  • Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  • Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  • follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

注意:kafka采用分区的目的:

  • 第一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。
  • 第二,分区可以作为并行处理的单元,稍后会谈到这一点。

1.4 术语介绍

Broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker
Topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic
Partition:Partition是一个物理上的概念,每个Topic包含一个或者多个Partition
Producer:负责发布消息到kafka的Broker中。
Consumer:消息消费者,向kafka的broker中读取消息的客户端
Consumer Group:每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)

1.4.1 Topic 主题

  • kafka将消息以topic为单位进行归类。
  • topic特指kafka处理的消息源(feeds of messages)的不同分类。
  • topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的;也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。
  • 在kafka集群中,可以有无数的主题。
  • 生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。

1.4.2 Kafka 的分区数(Partitions)

Partitions:分区数:控制topic将分片成多少个log,可以显示指定,如果不指定则会使用 broker(server.properties)中的num.partitions配置的数量。

  • 一个broker服务下,是否可以创建多个分区? 可以的,broker数与分区数没有关系
  • 在kafka中,每一个分区会有一个编号:编号从0开始
  • 某一个分区的数据是有序的
  • Partition数量决定了每个Consumer group中并发消费者的最大数量

    11--Kafka - 图4

1.4.3 Partition Replication 分区副本数

副本数(replication-factor):控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数
一个broker服务下,是否可以创建多个副本因子?
不可以;创建主题时,副本因子应该小于等于可用的broker数。
副本因子过程图:
11--Kafka - 图5

副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;主副本叫做leader,从副本叫做 follower(在有多个副本的情况下,kafka会为同一个分区下的分区,设定角色关系:一个leader和N个 follower),处于同步状态的副本叫做in-sync-replicas(ISR);follower通过拉的方式从leader同步数据。消费 者和生产者都是从leader读写数据,不与follower交互。
副本因子的作用:让kafka读取数据和写入数据时的可靠性。
副本因子是包含本身、同一个副本因子不能放在同一个Broker中。
如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个钟,选择一个leader,但不会在其 他的broker中,另启动一个副本(因为在另一台启动的话,存在数据传递,只要在机器之间有数据传递,就 会长时间占用网络IO,kafka是一个高吞吐量的消息系统,这个情况不允许发生)所以不会在零个broker中启 动。
如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。
lsr表示:当前可用的副本

1.4.4 kafka Partition offset 偏移量

任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset是一个long类型数字,它唯一标识了一条消息,消费者通过(offset,partition,topic)跟踪记录。

总结:分区数越多,同一时间可以有越多的消费者来进行消费,消费数据的速度就会越快,提高消费的性能

2 Kafka快速入门

2.1 安装部署

win10安装kafka

2.2 修改配置文件

(1)server.properties

  1. #broker 的全局唯一编号,不能重复
  2. broker.id=0
  3. #删除 topic 功能使能
  4. delete.topic.enable=true
  5. #处理网络请求的线程数量
  6. num.network.threads=3
  7. #用来处理磁盘 IO 的现成数量
  8. num.io.threads=8
  9. #发送套接字的缓冲区大小
  10. socket.send.buffer.bytes=102400
  11. #接收套接字的缓冲区大小
  12. socket.receive.buffer.bytes=102400
  13. #请求套接字的缓冲区大小
  14. socket.request.max.bytes=104857600
  15. #kafka 运行日志存放的路径
  16. log.dirs=/opt/module/kafka/logs
  17. #topic 在当前 broker 上的分区个数
  18. num.partitions=1
  19. #用来恢复和清理 data 下数据的线程数量
  20. num.recovery.threads.per.data.dir=1
  21. #segment 文件保留的最长时间,超时将被删除
  22. log.retention.hours=168
  23. #配置连接 Zookeeper 集群地址
  24. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

2.3 环境变量配置

2.4 命令行操作

  1. # 查看当前服务器中的所有 topic
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
  3. # 创建 topic
  4. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
  5. 选项说明:
  6. --topic 定义 topic
  7. --replication-factor 定义副本数
  8. --partitions 定义分区数
  9. # 删除 topic
  10. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
  11. 注意:需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
  12. # 发送消息
  13. [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
  14. >hello world
  15. >atguigu atguigu
  16. # 消费消息
  17. [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
  18. --zookeeper hadoop102:2181 --topic first
  19. [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
  20. --bootstrap-server hadoop102:9092 --topic first
  21. [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
  22. --bootstrap-server hadoop102:9092 --from-beginning --topic first
  23. 注意:
  24. --from-beginning:会把主题中以往所有的数据都读取出来。
  25. # 查看某个 Topic 的详情
  26. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper
  27. hadoop102:2181 --describe --topic first
  28. # 修改分区数
  29. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper
  30. hadoop102:2181 --alter --topic first --partitions 6

kafka 命令大全

3 接口API

Kafka有五个核心的API:

  • Producer API 允许应用程序发送数据流到kafka集群中的topic。
  • Consumer API 允许应用程序从kafka集群的topic中读取数据流。
  • Streams API 允许从输入topic转换数据流到输出topic。
  • Connect API 通过实现连接器(connector),不断地从一些源系统或应用程序中拉取数据到kafka,或从kafka提交数据到宿系统(sink system)或应用程序。
  • Admin API 用于管理和检查topic,broker和其他Kafka对象。

3.1 生产者 API

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

3.2 消费者 API

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

3.3 Streams API

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-streams</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

3.4 Kafka Connect API

实现一个连接器(connector),不断地从一些数据源系统拉取数据到kafka,或从kafka推送到宿系统(sink system)。详细介绍

3.5 Kafka Admin API

Admin API 用于管理和检查topic、broker、acls和其他Kafka对象。要使用Admin API,请添加以下Maven依赖关系。详细介绍

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

4 Kafka 架构深入

4.1 Kafka工作流程及文件存储机制

image.png

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

image.png
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。
注意:.log负责存消息;.index负责查消息

4.2 Kafka生产者

4.2.1分区策略**

1)分区原因:
(1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以 Partition 为单位读写了。

4.2.2 数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
image.png

1)副本数据同步策略

image.png
Kafka 选择了第二种方案,原因如下:
1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

2)ISR 同步副本(重点)

采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间未向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的leader。

3)ack应答机制(重点)

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks 参数配置**

  • 0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
  • 1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据
  • -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower(ISR中的所有Follower) 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

4)故障处理细节(重点)

image.png
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

  • follower故障
    • follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • leader故障
    • leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
  • 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

4.3 Kafka消费者

4.3.1消费方式

  • consumer 采用 pull(拉)模式从 broker 中读取数据。
  • push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。

它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

面试题大全

1.什么是kafka?

Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。

2.为什么要使用 kafka,为什么要使用消息队列?

  • 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
  • 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
  • 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
  • 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
  • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    3.Kafka中的ISR、AR代表什么?ISR的伸缩又指什么?

  • ISR: In-Sync Replicas 副本同步队列

  • AR: Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

4.kafka中的broker 是干什么的

broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。

5.kafka中的 zookeeper 起到什么作用,可以不用zookeeper么?

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。

6.kafka follower如何与leader同步数据

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。
完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。
异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据。
kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。

7.什么情况下一个 broker 会从 ISR中踢出去?

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica)(副本同步队列)。
每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。

8.kafka 为什么那么快

  • Cache Filesystem Cache PageCache缓存
  • 顺序写(由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快)
  • Zero-copy 零拷技术减少拷贝次数
  • Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
  • Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。

    9.kafka producer如何优化打入速度

  • 加线程

  • 提高 batch.size
  • 增加更多 producer 实例
  • 增加 partition 数
  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解
  • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置

    10.kafka producer打数据,ack为 0,1,-1 的时候代表啥,设置-1的时候,什么情况下,leader会认为一条消息 commit了?

  • (默认)1 : 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。

  • 0 :生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1 :producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。

    11.kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响?

    unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样有可能就会丢数据,spark streaming在消费过程中拿到的 end offset 会突然变小,导致 spark streaming job挂掉。如果unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。

    12.如果leader crash时,ISR为空怎么办?

    kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
    true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。
    false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

    13.kafka的message格式是什么样的

    一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成。
    header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。
    当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性
    body是由N个字节构成的一个消息体,包含了具体的key/value消息

    14.kafka中consumer group 是什么概念

    同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

    15.Kafka中的消息是否会丢失和重复消费?

    要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。

  • 消息发送

Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。
Kafka通过配置request.required.acks属性来确认消息的生产:

  • 0:表示不进行消息接收是否成功的确认
  • 1:表示当Leader接收成功时确认
  • -1:表示Leader和Follower都接收成功时确认

注意:

  • acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失
  • acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失
    • 消息消费

Kafka消息消费有两个consumer接口,Low-level API和High-level API:

  • Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制
  • High-level API:封装了对parition和offset的管理,使用简单

如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;

  • 解决方案

    • 针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
    • 针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

      16.为什么Kafka不支持读写分离?

      在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
      Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
  • 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

  • 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

    17.Kafka中是怎么体现消息顺序性的?

    kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
    整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.

    18.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?