一、kafka 概述

1. 定义

kafka 是一个分布式、基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

2. 消息队列

  • 应用场景

image.png

  • 优点
    • 解耦

允许你独立的扩展或者修改两边的处理过程,只要确保它们遵守同样的接口约束

  • 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息也可以在系统恢复后被处理。

  • 缓冲

可以控制和优化数据流经系统的速度,解决消息生产速度和消费速度不一致问题

  • 灵活性和峰值处理能力

在访问量剧增的情况下,应用仍然可以继续发挥作用,但是这样突发流量并不常见,如果以能处理这类峰值访问为标准来投入资源随时待命,无疑会带来巨大的浪费。使用消息队列能够使关键组件顶住这种突发的访问压力,不会因为突发的超负荷请求而崩溃。

  • 异步通信

很多时候,用户不想立即处理消息,消息队列提供了异步处理机制,允许用户把一个消息放入到队列,但是不立即处理它,后续在进行处理。

3. 消息队列的两种模式

  • 点对点模式(一对一,消费者主动拉取数据,消息收到后消除)

消息生产者将消息发送到 Queue 中,然后消费者从 Queue 中拉取并消费消息,消息被消费后便会消失,queue 中不再存储消息。queue 支持存在多个消费者,但是对已统一条消息来说,只能被一个消费者消费。
image.png

  • 发布订阅模式(一对多,消费者消费数据之后不会清除数据)

消息生产者(发布)将消息发布到topic 中,同时有多个消息消费者(订阅)消费该消息,和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
image.png

4. kafka 结构

image.png

  • producer : 消息生产者,向 kafka broker 发送消息的客户端
  • Consumer : 消费者 ,向 broker 取消息的客户端
  • Consumer Group : 消费者组,由多个消费者组成,每个消费者负责不同分区的数据,一个分区只能由一个组内消费者消费,消费者组不受影响。注意:消费者组在逻辑上是一个订阅者。
  • Broker : 一个 kafka 服务器就是一个 broker,一个集群由多个 broker 组成,每个broker 可以容纳多个topic
  • topic : 可以理解为一个队列,生产者和消费者面向的是同一个topic
  • Partition : 当 topic 非常大的时候,可以划分为多个分区,分配到不同的broker(浏览器)上,每个partition 都是一个有序队列。
  • Replication :存放分区消息副本,保障数据不丢失,leader 和follower。leader :多个副本的“主”,follower : 多个副本的“从”,当leader 发生故障的时候,follower 会变成leader

    二、快速入门

    1. 安装

    1. 集群规划

    | hadoop102 | hadoop103 | hadoop104 | | —- | —- | —- | | zk | zk | zk | | kafka | kafka | kafka |

2. jar 包下载

https://kafka.apache.org/downloads.html
image.png

3. 集群部署

  • 解压安装

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

  • 修改解压后的文件

mv kafka_2.11-0.11.0.0/ kafka

  • 在安装目录下安装logs 文件

mkdir logs

  • 修改配置文件

cd config/
vi server.properties

  1. #broker 的全局唯一编号,不能重复
  2. broker.id=0
  3. #删除 topic 功能使能
  4. delete.topic.enable=true
  5. #处理网络请求的线程数量
  6. num.network.threads=3
  7. #用来处理磁盘 IO 的现成数量
  8. num.io.threads=8
  9. #发送套接字的缓冲区大小
  10. socket.send.buffer.bytes=102400
  11. #接收套接字的缓冲区大小
  12. socket.receive.buffer.bytes=102400
  13. #请求套接字的缓冲区大小
  14. socket.request.max.bytes=104857600
  15. #kafka 运行日志存放的路径
  16. log.dirs=/opt/module/kafka/logs
  17. #topic 在当前 broker 上的分区个数
  18. num.partitions=1
  19. #用来恢复和清理 data 下数据的线程数量
  20. num.recovery.threads.per.data.dir=1
  21. #segment 文件保留的最长时间,超时将被删除
  22. log.retention.hours=168
  23. #配置连接 Zookeeper 集群地址
  24. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
  • 配置环境变量

sudo vi /etc/profile

  1. #KAFKA_HOME
  2. export KAFKA_HOME=/opt/module/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin
  4. [allen@hadoop102 module]$ source /etc/profile
  • 分发安装包

xsync kafka/

  • 分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2 注:broker.id 不得重复
  • 启动 关闭 ```java bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-stop.sh stop

脚本

for i in hadoop102 hadoop103 hadoop104 do echo “========== $i ==========” ssh $i ‘/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties’ done

  1. <a name="JYOxz"></a>
  2. ### 2. kafka 命令行操作
  3. <a name="vrpcs"></a>
  4. #### 1. 查看topic
  5. ` bin/kafka-topics.sh --zookeeper hadoop102:2181 --list `
  6. <a name="selxe"></a>
  7. #### 2. 创建topic
  8. ```properties
  9. bin/kafka-topics.sh --zookeeper hadoop102:2181
  10. --create --replication-factor 3
  11. --partitions 1
  12. --topic first

选项说明:
—topic 定义 topic 名
—replication-factor 定义副本数
—partitions 定义分区数

3. 删除 topic

 ` bin/kafka-topics.sh --zookeeper  hadoop102:2181 --delete --topic first `

4. 发送消息

bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
hello world
atguigu atguigu

5. 消费消息

  ` bin/kafka-console-consumer.sh  --zookeeper hadoop102:2181 --topic first `<br /> ` bin/kafka-console-consumer.sh  --bootstrap-server hadoop102:9092 --topic first `<br />` bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first `

6. 查看某个topic 详情

bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

7.修改分区

bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

三、kafka 架构

1. 工作流程和问价存储机制

image.png
kafka 中所有的信息都是以 topic 进行分类的,生产者生产消息,消费者消费消息,这都是面向 topic的,但是topic 的逻辑概念,分区(partition)是物理概念。每个topic 拥有多个分区,每个分区内存放着生产者生产的消息。因为是以追加的方式存储消息的,所以对应的存储文件(log)越来越大。为了解决这个问题,kafka引入分片和索引机制。现将log 分为不同的片(segment),每个分片下面对应着 .index 和 .log文件。其中index 文件存放的是索引(offset 和实际存储偏移量对应关系),log文件存储实际的消息数据。
kafka - 图7

例如,主题frist有三个分区,根据分区命名规则:topic名称+分区序列号,则为frist-0,frist-1,frist-2。index,log 是以当前分片(segment)第一条消息的offset 命名。对应的文件为:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

image.png

2. 生产者

1. 为什么要分区

  • 方便大数据量的存储,提高并发量。

每个topic 可以有多个分区,以分区为单位进行读写,提高数据的并发

  • 方便在集群中扩展

设置分区后,每个 partition 可以通过调整,以适应机器。

2. 分区的原则

  • 如果topic 指定分区 ,直接按照 指定的分区进行划分
  • 如果没指定分区,但是有 key 的情况下,将key的hash 值与topic 的分区数进行取余,结果分为分区号
  • 如果两者都没有,第一次调用的时候,随机生成一个整数,用其与分区号进行取余操作,结果作为本次的分区号,之后再由这样的数据,该整数直接+1。该方法也成为了轮训。

    3. 数据的可靠性保障

    为了保障 每条消息都能可靠的发送到对应的topic中,需要 topic 收到消息后,向生产者发送一条确认信息(acknowledge),producer 收到后在进行下一条数据的发送。
    topic 收到消息后,何时发送 ack 确认信息,有两种方案:一是 所有的follower 和leader 同步完成后在发送,二是 半数以上的follower 跟leader 同步完成后,发送ack。如果最大容忍 n 太节点故障,那么第一种方案只需要 n+1 台节点即可,而第二种则是 2*n +1 节点,但是第一种由于是等待所有的follower 同步完成后才发送ack 所以网络延迟较大。综合考虑 kafka 选择了第一种:等待所有节点同步完成后再次发送ack。
    但是如果总有那么一两个节点,数据同步的特别慢,迟迟不能与leader 同步,那就太浪费时间了。所以kafka维护了一个动态的 in-sync replicaset(ISR),它里面包含了N个节点,意思是只要ISR 中follower 数据同步完成后,就立刻发送 ack。 ISR,并不是固定不变的,如果其中的某台节点,长时间未能与leader 进行数据同步,就会被提出ISR,具体时间设置通过 replica.lag.time.max.ms 指定。如果 leader 挂掉后,会从 ISR 中选出新的leader。
    但是 有些数据并不是那么重要,没必要等到所有的 follower 同步完成后才发送ack,所以 kafka 又提供了三种应答机制 ack =0、1、-1

    • ack =0: 表示 分区 收到消息,就立刻回复 ACK ,不需要等到 消息落盘。数据丢失概率最大
    • ack = 1: 表示 分区收到消息后,进行落盘,成功后发送ACK,不需要等待follower 同步。数据有丢失的概率
    • ack = -1: 表示 分区收到数据后,落盘并等待follower 全部同步完成后,发送 ack 。这种情况下,有数据重复的可能。

      4. 故障处理

  • 概念

    • Hign Watermark(HW) : 所有副本中最小的 offset 也是消费者能见的最大offset,同时是ISR 队列中最小的 LEO
    • Log End Offset(LEO) : 每个分区副本 最后一个消息的offset,也是最大的offset

image.png

  • follower 故障

当 follower 发生故障的时候,会被踢出 ISR ,待其恢复后,读取本地磁盘中上次记录的 HW,然后将 log 文件中高于 HW 的消息截掉,接着向leader 同步数据,当该follower 的 LOF 大于等于当前副本的 HW 时,会重新加入到 ISR 中

  • leader 故障

leader 故障后,会从 ISR 中 选出新的 leader,为了保障数据的一致性,会将其余follower log 文件高于 HW的截断,从新的leader 中获取数据

5.Exactly Once

将ACK设置为-1,可以保障producer和Server之间消息不会丢失,即At Least Once,这样会导致消息重复;将ACK设置为0,可以保障每条消息只被发送一次,即At Most Once,但是数据容易丢失。在生产环境中常常有这样的场景,有些重要的消息,比如交易数据,需要保障数据既不会丢失,也不会重复发送。为了解决这个问题,kafka 0.11版本之后引入幂等性,指的是无论producer 发送多少条消息,server 端只会持久化一条。这样幂等性结合At Least Once,就可以保障消息只被可靠的接受一次,也就是Exactlyonece。所以
At Least Once + 幂等性 = Exactly Once
启动幂等性设置,只需要将producer端参数中 enable.idompotence 设置为 true 即可。实现是将下游需要做的放到了上游,具体实现是:每个producer在初始化的时候,都会被分配一个独有的PID,发往同一个同一个partition的消息会附带一个Sequence Number,当broker收到信息的时候会对做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。但是producer重启后会生成新的pid,同时不同的分区也会有不同的主键,所以幂等性无法保障跨分区跨对话的Exactly Once。

3.消费者

1.消费方式

consumer采用pull的模式从broker中读取数据,这样可以让消费者根据自己的消费能力动态选择消费速度。如果采用push的模式,只能是broker推送消息,消息发送的速度难以和consumer消费速度匹配。但是如果采用pull的方式,而broker中没有消息,便会陷入重复,一直返回空数据,为了解决这个问题,消费者在接受数据的时候会传入有个时长参数:timeout,如果当前没有数据可供消费,consumer会等待一段时间再次请求,这个等待时间就是timeout

2.分区分配原则

同一个图片topic有多个分区,每个消费者组有多个消费者,这就发生分区分配问题。在kafka中内置了两种分配策略,angeAssignor 分配策略(范围分区),另一种是 RoundRobinAssignor分配策略(轮询分区)。默认采用 Range 范围分区。

  • Range 范围分区

范围分区是针对topic来说的,首先将同一个topic下的分区进行排序,记录中分区数,然后除以消费者总数,如果除不尽,则将多余的分区按消费者顺序从上到下进行分配。
比如:有一个topic有十个分区(0-10),有三个消费者且在同一个消费者组中(c1,c2,c3)

c1 0,1,2,3
c2 4,5,6
c3 7,8,9

弊端:存在某种极限,有N个topic,使用范围分区后,

  • RangeAssignor轮询分区

这时候有两种情况,一是所有消费者都订阅了同一个主题,二是,消费者组中的消费者订阅了不同的主题。

  • 同一消费者组订阅了相同的主题

均匀分配
例如:同一消费者组中,有 3 个消费者C0、C1和C2,都订阅了 2 个主题 t0 和 t1,并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所以分区可以标识为t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终分区分配结果如下:

c0 t0p0.t1p0
c1 t0p1t1p1
c2 t0p2t1p2
  • 同一消费者组订阅了不同主题

不完全平均分配,
例如:同一消费者组中有三个消费者,c1,c2,c3,broker有三个topic,topic1有一个分区,topic2有两个分区,topic3有三个分区,其中c1订阅了主题 topic1,c2订阅了topic2,c3订阅了topic3,此时分去分配结果如下:

c1 t1p0
c2 t2p0
c2 t2p1,t3p0,t3p1,t3p2

3.offset维护

由于consumer在消费的过程中,可能会出现宕机或网络等原因导致不能继续消费消息,所以需要时刻记录自己消费到哪一条消息了。
具体操作是,通过offset 记录自己 下一步该消费哪一条信息。在0.9版本之前,默认将offset存储在zookeeper 中,之后consumer默认将offset 存储在kafka的一个内置topic中,为 _consumer——offsets

4. 高效数据读写技术

  • 顺序读写磁盘

kafka的producer生产的数据,按照顺序追加到文件末端,顺序读写。根据官网的数据,顺序读写能600M/s,而随机读写只有100K/S

  • 零复制技术

正常的文件传输需要经过四次步骤

  1. 程序调用read(),将文件拷贝到内核模式的 ReadBuffer 中
  2. CPU控制将 ReadBuffer 中的数据拷贝到用户模式的 Application Cache中
  3. 程序调用 writer(),将数据复制到内核模式的 Socket Buffer 中
  4. CPU控制 Socket Buffer 将数据拷贝到网卡设备中进行传输

kafka - 图10
所以 执行一次文件读取操作,需要进行4次内核态和用户态之间的转换,4次文件拷贝,效率低下。

kafka基于DMA (Directed Memory Access,内存直接访问) 技术实现 零复制技术,原理是 直接将文件从本地拷贝到 ReadBuffer 中,然后只传递数据的描述信息到 Socket Buffer 中,DMA引擎直接将数据从 Read Buffer 拷贝到 网卡中进行数据发送。
kafka - 图11
通过零拷贝技术,就不需要将 内核空间中的页缓存数据拷贝到应用程序中,再从应用程序中拷贝出来,直接节省了两次拷贝,两次状态转换。大大提升了消费者读取数据的性能。再次读取数据的时候,会首先查看内核空间中是否已有数据,有的话直接通过网关发送出去。

4. Zookeeper 在kafka 中的作用

kafka 集群会从 broker 中选出 Controller,负责broker的上下线,topic分区的副本配置以及leader的徐选举,这些操作都是依赖于zookeeper实现的。

  • controller 的选举流程

最先在zookeeper 上创建 /controller 节点的 broker 就是 controller 。zookeeper 会保障只有一个节点会成功创建 /controller

  • leader 的选举

当每个broker 启动的时候,都获取 /brokers/topics 节点下创建节点,第一个创建节点的 broker 就是 leader,剩下的都是follower。
如下图, 对于每个Topic都有一个ISR列表,直接取ISR列表的第一个作为leader,如果当前挂的就是第一个,则选择后面一个作为leader
image.png
unclean.leader.election.enable=false, 默认情况下, 是从IRS列表里面的节点作为leader,但是如果这个参数配置成true,不在ISR列表但是在Replicas列表里面的也可以作为选举leader的。但是可以想像得出来,需要谨慎使用。

5. 事务

kafka 的事务主要是针对producer端的,对于consumer端来说,就弱多了。

  • producer

前面提到可以使用 Exactly Once 实现消息只被保存一次,但是不能实现生产者和消费者划分区对话。为了解决这个问题,kafka用0.11版本之后引入了一个新的组件 Transaction Coordinator,它可以产生一个唯一 Transaction ID,将 Producer获得的PID 和Transaction ID 绑定,这样即使producer 重启了,也可以通过 Transaction ID获取到原来的 ID,进而获取对应的任务状态。 Transaction Coordinator还将事务写入到 kafka 内部的topic 中,这样即使整个服务重启,事务状态也可以保存

6. 消息发送流程

kafka 中 producer 发送消息是采用异步方式发送的,涉及到两个线程 main和sender,以及一个线程共享变量RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka broker。
image.png
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

7.offset的维护

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

  • 自动提交 offse

    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");

  • 手动提交 offset

    • 同步提交

    props.put("enable.auto.commit", "false");
    consumer.commitSync();

    • 异步提交

    props.put("enable.auto.commit", "false");

    //异步提交
    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, 
                             OffsetAndMetadata> offsets, Exception exception) {
          if (exception != null) {
              System.err.println("Commit failed for" + 
                                 offsets);
          }
      }
    });