一、kafka 概述
1. 定义
kafka 是一个分布式、基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
2. 消息队列
- 应用场景

- 优点
- 解耦
允许你独立的扩展或者修改两边的处理过程,只要确保它们遵守同样的接口约束
- 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息也可以在系统恢复后被处理。
- 缓冲
可以控制和优化数据流经系统的速度,解决消息生产速度和消费速度不一致问题
- 灵活性和峰值处理能力
在访问量剧增的情况下,应用仍然可以继续发挥作用,但是这样突发流量并不常见,如果以能处理这类峰值访问为标准来投入资源随时待命,无疑会带来巨大的浪费。使用消息队列能够使关键组件顶住这种突发的访问压力,不会因为突发的超负荷请求而崩溃。
- 异步通信
很多时候,用户不想立即处理消息,消息队列提供了异步处理机制,允许用户把一个消息放入到队列,但是不立即处理它,后续在进行处理。
3. 消息队列的两种模式
- 点对点模式(一对一,消费者主动拉取数据,消息收到后消除)
消息生产者将消息发送到 Queue 中,然后消费者从 Queue 中拉取并消费消息,消息被消费后便会消失,queue 中不再存储消息。queue 支持存在多个消费者,但是对已统一条消息来说,只能被一个消费者消费。
- 发布订阅模式(一对多,消费者消费数据之后不会清除数据)
消息生产者(发布)将消息发布到topic 中,同时有多个消息消费者(订阅)消费该消息,和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
4. kafka 结构

- producer : 消息生产者,向 kafka broker 发送消息的客户端
- Consumer : 消费者 ,向 broker 取消息的客户端
- Consumer Group : 消费者组,由多个消费者组成,每个消费者负责不同分区的数据,一个分区只能由一个组内消费者消费,消费者组不受影响。注意:消费者组在逻辑上是一个订阅者。
- Broker : 一个 kafka 服务器就是一个 broker,一个集群由多个 broker 组成,每个broker 可以容纳多个topic
- topic : 可以理解为一个队列,生产者和消费者面向的是同一个topic
- Partition : 当 topic 非常大的时候,可以划分为多个分区,分配到不同的broker(浏览器)上,每个partition 都是一个有序队列。
- Replication :存放分区消息副本,保障数据不丢失,leader 和follower。leader :多个副本的“主”,follower : 多个副本的“从”,当leader 发生故障的时候,follower 会变成leader
二、快速入门
1. 安装
1. 集群规划
| hadoop102 | hadoop103 | hadoop104 | | —- | —- | —- | | zk | zk | zk | | kafka | kafka | kafka |
2. jar 包下载
https://kafka.apache.org/downloads.html
3. 集群部署
- 解压安装
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
- 修改解压后的文件
mv kafka_2.11-0.11.0.0/ kafka
- 在安装目录下安装logs 文件
mkdir logs
- 修改配置文件
cd config/vi server.properties
#broker 的全局唯一编号,不能重复broker.id=0#删除 topic 功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘 IO 的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka 运行日志存放的路径log.dirs=/opt/module/kafka/logs#topic 在当前 broker 上的分区个数num.partitions=1#用来恢复和清理 data 下数据的线程数量num.recovery.threads.per.data.dir=1#segment 文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接 Zookeeper 集群地址zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
- 配置环境变量
sudo vi /etc/profile
#KAFKA_HOMEexport KAFKA_HOME=/opt/module/kafkaexport PATH=$PATH:$KAFKA_HOME/bin[allen@hadoop102 module]$ source /etc/profile
- 分发安装包
xsync kafka/
- 分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2 注:broker.id 不得重复
- 启动 关闭 ```java bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-stop.sh stop
脚本
for i in hadoop102 hadoop103 hadoop104 do echo “========== $i ==========” ssh $i ‘/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties’ done
<a name="JYOxz"></a>### 2. kafka 命令行操作<a name="vrpcs"></a>#### 1. 查看topic` bin/kafka-topics.sh --zookeeper hadoop102:2181 --list `<a name="selxe"></a>#### 2. 创建topic```propertiesbin/kafka-topics.sh --zookeeper hadoop102:2181--create --replication-factor 3--partitions 1--topic first
选项说明:
—topic 定义 topic 名
—replication-factor 定义副本数
—partitions 定义分区数
3. 删除 topic
` bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first `
4. 发送消息
bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
hello world
atguigu atguigu
5. 消费消息
` bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first `<br /> ` bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first `<br />` bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first `
6. 查看某个topic 详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7.修改分区
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
三、kafka 架构
1. 工作流程和问价存储机制

kafka 中所有的信息都是以 topic 进行分类的,生产者生产消息,消费者消费消息,这都是面向 topic的,但是topic 的逻辑概念,分区(partition)是物理概念。每个topic 拥有多个分区,每个分区内存放着生产者生产的消息。因为是以追加的方式存储消息的,所以对应的存储文件(log)越来越大。为了解决这个问题,kafka引入分片和索引机制。现将log 分为不同的片(segment),每个分片下面对应着 .index 和 .log文件。其中index 文件存放的是索引(offset 和实际存储偏移量对应关系),log文件存储实际的消息数据。
例如,主题frist有三个分区,根据分区命名规则:topic名称+分区序列号,则为frist-0,frist-1,frist-2。index,log 是以当前分片(segment)第一条消息的offset 命名。对应的文件为:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
2. 生产者
1. 为什么要分区
- 方便大数据量的存储,提高并发量。
每个topic 可以有多个分区,以分区为单位进行读写,提高数据的并发
- 方便在集群中扩展
设置分区后,每个 partition 可以通过调整,以适应机器。
2. 分区的原则
- 如果topic 指定分区 ,直接按照 指定的分区进行划分
- 如果没指定分区,但是有 key 的情况下,将key的hash 值与topic 的分区数进行取余,结果分为分区号
如果两者都没有,第一次调用的时候,随机生成一个整数,用其与分区号进行取余操作,结果作为本次的分区号,之后再由这样的数据,该整数直接+1。该方法也成为了轮训。
3. 数据的可靠性保障
为了保障 每条消息都能可靠的发送到对应的topic中,需要 topic 收到消息后,向生产者发送一条确认信息(acknowledge),producer 收到后在进行下一条数据的发送。
topic 收到消息后,何时发送 ack 确认信息,有两种方案:一是 所有的follower 和leader 同步完成后在发送,二是 半数以上的follower 跟leader 同步完成后,发送ack。如果最大容忍 n 太节点故障,那么第一种方案只需要 n+1 台节点即可,而第二种则是 2*n +1 节点,但是第一种由于是等待所有的follower 同步完成后才发送ack 所以网络延迟较大。综合考虑 kafka 选择了第一种:等待所有节点同步完成后再次发送ack。
但是如果总有那么一两个节点,数据同步的特别慢,迟迟不能与leader 同步,那就太浪费时间了。所以kafka维护了一个动态的 in-sync replicaset(ISR),它里面包含了N个节点,意思是只要ISR 中follower 数据同步完成后,就立刻发送 ack。 ISR,并不是固定不变的,如果其中的某台节点,长时间未能与leader 进行数据同步,就会被提出ISR,具体时间设置通过 replica.lag.time.max.ms 指定。如果 leader 挂掉后,会从 ISR 中选出新的leader。
但是 有些数据并不是那么重要,没必要等到所有的 follower 同步完成后才发送ack,所以 kafka 又提供了三种应答机制 ack =0、1、-1概念
- Hign Watermark(HW) : 所有副本中最小的 offset 也是消费者能见的最大offset,同时是ISR 队列中最小的 LEO
- Log End Offset(LEO) : 每个分区副本 最后一个消息的offset,也是最大的offset

- follower 故障
当 follower 发生故障的时候,会被踢出 ISR ,待其恢复后,读取本地磁盘中上次记录的 HW,然后将 log 文件中高于 HW 的消息截掉,接着向leader 同步数据,当该follower 的 LOF 大于等于当前副本的 HW 时,会重新加入到 ISR 中
- leader 故障
leader 故障后,会从 ISR 中 选出新的 leader,为了保障数据的一致性,会将其余follower log 文件高于 HW的截断,从新的leader 中获取数据
5.Exactly Once
将ACK设置为-1,可以保障producer和Server之间消息不会丢失,即At Least Once,这样会导致消息重复;将ACK设置为0,可以保障每条消息只被发送一次,即At Most Once,但是数据容易丢失。在生产环境中常常有这样的场景,有些重要的消息,比如交易数据,需要保障数据既不会丢失,也不会重复发送。为了解决这个问题,kafka 0.11版本之后引入幂等性,指的是无论producer 发送多少条消息,server 端只会持久化一条。这样幂等性结合At Least Once,就可以保障消息只被可靠的接受一次,也就是Exactlyonece。所以
At Least Once + 幂等性 = Exactly Once
启动幂等性设置,只需要将producer端参数中 enable.idompotence 设置为 true 即可。实现是将下游需要做的放到了上游,具体实现是:每个producer在初始化的时候,都会被分配一个独有的PID,发往同一个同一个partition的消息会附带一个Sequence Number,当broker收到信息的时候会对
3.消费者
1.消费方式
consumer采用pull的模式从broker中读取数据,这样可以让消费者根据自己的消费能力动态选择消费速度。如果采用push的模式,只能是broker推送消息,消息发送的速度难以和consumer消费速度匹配。但是如果采用pull的方式,而broker中没有消息,便会陷入重复,一直返回空数据,为了解决这个问题,消费者在接受数据的时候会传入有个时长参数:timeout,如果当前没有数据可供消费,consumer会等待一段时间再次请求,这个等待时间就是timeout
2.分区分配原则
同一个图片topic有多个分区,每个消费者组有多个消费者,这就发生分区分配问题。在kafka中内置了两种分配策略,angeAssignor 分配策略(范围分区),另一种是 RoundRobinAssignor分配策略(轮询分区)。默认采用 Range 范围分区。
- Range 范围分区
范围分区是针对topic来说的,首先将同一个topic下的分区进行排序,记录中分区数,然后除以消费者总数,如果除不尽,则将多余的分区按消费者顺序从上到下进行分配。
比如:有一个topic有十个分区(0-10),有三个消费者且在同一个消费者组中(c1,c2,c3)
| c1 | 0,1,2,3 |
|---|---|
| c2 | 4,5,6 |
| c3 | 7,8,9 |
弊端:存在某种极限,有N个topic,使用范围分区后,
- RangeAssignor轮询分区
这时候有两种情况,一是所有消费者都订阅了同一个主题,二是,消费者组中的消费者订阅了不同的主题。
- 同一消费者组订阅了相同的主题
均匀分配
例如:同一消费者组中,有 3 个消费者C0、C1和C2,都订阅了 2 个主题 t0 和 t1,并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所以分区可以标识为t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终分区分配结果如下:
| c0 | t0p0.t1p0 |
|---|---|
| c1 | t0p1t1p1 |
| c2 | t0p2t1p2 |
- 同一消费者组订阅了不同主题
不完全平均分配,
例如:同一消费者组中有三个消费者,c1,c2,c3,broker有三个topic,topic1有一个分区,topic2有两个分区,topic3有三个分区,其中c1订阅了主题 topic1,c2订阅了topic2,c3订阅了topic3,此时分去分配结果如下:
| c1 | t1p0 | |
|---|---|---|
| c2 | t2p0 | |
| c2 | t2p1,t3p0,t3p1,t3p2 |
3.offset维护
由于consumer在消费的过程中,可能会出现宕机或网络等原因导致不能继续消费消息,所以需要时刻记录自己消费到哪一条消息了。
具体操作是,通过offset 记录自己 下一步该消费哪一条信息。在0.9版本之前,默认将offset存储在zookeeper 中,之后consumer默认将offset 存储在kafka的一个内置topic中,为 _consumer——offsets
4. 高效数据读写技术
- 顺序读写磁盘
kafka的producer生产的数据,按照顺序追加到文件末端,顺序读写。根据官网的数据,顺序读写能600M/s,而随机读写只有100K/S
- 零复制技术
正常的文件传输需要经过四次步骤
- 程序调用read(),将文件拷贝到内核模式的 ReadBuffer 中
- CPU控制将 ReadBuffer 中的数据拷贝到用户模式的 Application Cache中
- 程序调用 writer(),将数据复制到内核模式的 Socket Buffer 中
- CPU控制 Socket Buffer 将数据拷贝到网卡设备中进行传输

所以 执行一次文件读取操作,需要进行4次内核态和用户态之间的转换,4次文件拷贝,效率低下。
kafka基于DMA (Directed Memory Access,内存直接访问) 技术实现 零复制技术,原理是 直接将文件从本地拷贝到 ReadBuffer 中,然后只传递数据的描述信息到 Socket Buffer 中,DMA引擎直接将数据从 Read Buffer 拷贝到 网卡中进行数据发送。
通过零拷贝技术,就不需要将 内核空间中的页缓存数据拷贝到应用程序中,再从应用程序中拷贝出来,直接节省了两次拷贝,两次状态转换。大大提升了消费者读取数据的性能。再次读取数据的时候,会首先查看内核空间中是否已有数据,有的话直接通过网关发送出去。
4. Zookeeper 在kafka 中的作用
kafka 集群会从 broker 中选出 Controller,负责broker的上下线,topic分区的副本配置以及leader的徐选举,这些操作都是依赖于zookeeper实现的。
- controller 的选举流程
最先在zookeeper 上创建 /controller 节点的 broker 就是 controller 。zookeeper 会保障只有一个节点会成功创建 /controller
- leader 的选举
当每个broker 启动的时候,都获取 /brokers/topics 节点下创建节点,第一个创建节点的 broker 就是 leader,剩下的都是follower。
如下图, 对于每个Topic都有一个ISR列表,直接取ISR列表的第一个作为leader,如果当前挂的就是第一个,则选择后面一个作为leader
unclean.leader.election.enable=false, 默认情况下, 是从IRS列表里面的节点作为leader,但是如果这个参数配置成true,不在ISR列表但是在Replicas列表里面的也可以作为选举leader的。但是可以想像得出来,需要谨慎使用。
5. 事务
kafka 的事务主要是针对producer端的,对于consumer端来说,就弱多了。
- producer
前面提到可以使用 Exactly Once 实现消息只被保存一次,但是不能实现生产者和消费者划分区对话。为了解决这个问题,kafka用0.11版本之后引入了一个新的组件 Transaction Coordinator,它可以产生一个唯一 Transaction ID,将 Producer获得的PID 和Transaction ID 绑定,这样即使producer 重启了,也可以通过 Transaction ID获取到原来的 ID,进而获取对应的任务状态。 Transaction Coordinator还将事务写入到 kafka 内部的topic 中,这样即使整个服务重启,事务状态也可以保存
6. 消息发送流程
kafka 中 producer 发送消息是采用异步方式发送的,涉及到两个线程 main和sender,以及一个线程共享变量RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka broker。
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
7.offset的维护
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
自动提交 offse
props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");手动提交 offset
- 同步提交
props.put("enable.auto.commit", "false");consumer.commitSync();- 异步提交
props.put("enable.auto.commit", "false");//异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for" + offsets); } } });
