kafka.apache.org
image.png

消息队列的两种模式:

image.png

Kafka的基础架构:

  1. 一个topic分为多个partition
  2. broker 对应的是服务器
  3. 每个分区partition只允许一个消费者消费
  4. leader和follower

image.png

  • 相关概念:

(1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker 取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消
费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个
broker 可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服
务器)上,一个topic 可以分为多个partition,每个partition 是一个有序的队列。
(7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个
Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数
据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和
Leader 数据的同步。Leader 发生故障时,某个Follower 会成为新的Leader。

快速入门

  1. 修改相关kafka配置文件
    1. broker_id
    2. 本地域名的更改——localhost

  2. 配置Kafka全局环境变量
  3. 需要先启动zookeeper
  4. Kafka安装

批量启动多个服务器Kafka的脚本
image.png

Kafka相关命令行

1、主题脚本命令(Topic)

1) bin/kafka-topics.sh

参数 描述
—bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
—topic 操作的 topic 名称。
—create 创建主题。
—delete 删除主题。
—alter 修改主题。
—list 查看所有主题。
—describe 查看主题详细描述。
—partitions 设置分区数。
—replication-factor 设置分区副本。
—config 更新系统默认的配置。


2)查看当前服务器中的所有 topic
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —list
3)创建 first topic
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —create —partitions 1 —replication-factor 3 —topic first

选项说明:
—topic 定义 topic 名
—replication-factor 定义副本数
—partitions 定义分区数

4)查看 first 主题的详情
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —describe —topic first
5)修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —alter —topic first —partitions 3
6)再次查看 first 主题的详
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —describe —topic first
7)删除 topic(学生自己演示)
bin/kafka-topics.sh —bootstrap-server hadoop102:9092 —delete —topic first

2、生产者脚本命令(Producer)

1) bin/kafka-console-producer.sh —bootstrap-server hadoop102:9092 —topics first
>hello
>kafak

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地 址 清 单 。 例如
hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker
里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写
全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可
以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time
之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和
all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性
要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也
就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。

3、消费者脚本命令(Consumer)

bin/kafka-console-consumer.sh

参数 描述
—bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
—topic 操作的 topic 名称。
—from-beginning 从头开始消费。
—group 指定消费者组名称。

消费信息
(1)消费 first 主题中的数据。

bin/kafka-console-consumer.sh —bootstrap-server hadoop102:9092 —topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh —bootstrap-server hadoop102:9092 —form-beginning —topic first

第三章:Kafka 生产者

3.1 发送流程

Main线程——Sender线程

3.1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

image.png

main线程中的Producer调用send(ProducerRecord) ->(Interceptors) 拦截器->序列化器(Seriaizer)-> 分区器
->缓存队列->批次大小

缓存队列:双端队列?

sender线程 -> Selector

关键点:
1、batch.size (> 16K) 和 linger.ms (达到设置的时间 xx ms)
2、Sender线程:默认每个broker缓存最多5个请求
3、kafka集群(Broker)的应答机制acks : 0、1、-2

异步发送API

同步发步API

3.2 生产者分区

3.2.1Kafka分区的好处

3.2.2 Kafka选择分区原则

image.png
三种: