- 1. 主流的应用场景(其它MQ同理)
- 2. 环境搭建
- 2.1 服务端配置说明
- 2.2 搭建
zookeeper
集群 - 2.3 搭建
kafka
集群 - 可以直接在zk集群的三台服务器上搭建kafka集群
- 下载kafka安装包
- 上传文件到服务器,或者上传一个之后使用copy命令
- 解压到指定目录
- 修改解压后的文件名称
- 进入配置文件目录
- 修改server.properties文件,指定kafka集群中broker的唯一标志,默认值是-1,如果没有指定会默认生成一个
- 修改server.properties文件,需要连接的zk集群地址,包含ip:port
- 修改server.properties文件,修改端口
- 修改server.properties文件,指明kafka需要监听的客户端
- 修改server.properties文件,一般写本机地址
- 修改server.properties文件,所有的消息都是需要保存到磁盘上的,这两个就是指定存储文件目录,配置的时候配置一个就行了,dirs如果有多个磁盘可以写多个文件分开存储
- 修改server.properties文件,broker所能接收的最大消息值,默认是1m
- 修改server.properties文件,新建一个topic默认的分区数,默认是1
- 创建kafka存储消息(log日志数据)的目录
- 到此为止,kafka已经配置成功,进入到kafka的bin目录,执行启动命令,启动kafka
- 2.4 搭建kafka manager可视化控制台
- 3. 核心概念讲解
- 4. 生产者/消费者快速入门
- 5. 与
SpringBoot
整合使用 - 6. 海量日志搜集
ELK
acquire:获得、取到 Concurrent:并发
1. 主流的应用场景(其它MQ同理)
1.1 数据异步化
1.1.1 什么是异步化
- 异步化的原理同理为多线程中的异步,例如下面的例子
- 消息持久化:可以防止我们的消息发送失败,我们可以重推送,保证我们的消息触达率(这个一般是重要消息,具体参考
Rabbit MQ
的学习过程,因为Rabbit
适合中小型公司做数据异步化消息中间件)1.2 服务解耦、削峰填谷
1.2.1 概念理解
- 服务解耦:异步化了之后,就是不用再担心别的服务的问题
削峰填谷:
MQ
天然的就有这种缓存的机制,把消息缓存到MQ borker
里面,然后我们去慢速的消费,高峰期的时候可以去做一个削峰,低谷期的时候做一个填谷,使我们的处理能力,相对来说更均匀一些1.2.2 架构分析
1.3 海量日志收集(
kafka
特有)海量日志收集:注意第阶段的日志收集实战,自己总结
- 上图的架构解析:
- 下载kafka安装包:https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
2.1 服务端配置说明
参数文件:$KAFKA_HOME/config/server.properties
zookeeper.connect
:需要连接的zk集群地址,包含ip:port
listeners
:指明kafka
需要监听的客户端broker.id
:指定kafka
集群中broker
的唯一标志,默认值是-1
,如果没有指定会默认生成一个log.dir、log.dirs
:kafka
所有的消息都是需要保存到磁盘上的,这两个就是指定存储文件目录message.max.bytes
:broker
所能接收的最大消息值,默认是1m
2.2 搭建
zookeeperzookeeper
集群2.3 搭建
```makefilekafka
集群可以直接在zk集群的三台服务器上搭建kafka集群
下载kafka安装包
https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
上传文件到服务器,或者上传一个之后使用copy命令
/home/software/kafka scp kafka_2.12-2.1.0.tgz 192.168.231.132:/home/software/kafka
解压到指定目录
tar -zxvf kafka_2.12-2.1.0.tgz.gz -C /usr/local/
修改解压后的文件名称
cd /usr/local mv kafka_2.12-2.1.0 kafka_2.12
进入配置文件目录
vim /usr/local/kafka_2.12/config/server.properties
修改server.properties文件,指定kafka集群中broker的唯一标志,默认值是-1,如果没有指定会默认生成一个
broker.id=0
修改server.properties文件,需要连接的zk集群地址,包含ip:port
zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181
修改server.properties文件,修改端口
port=9092
修改server.properties文件,指明kafka需要监听的客户端
listeners:
修改server.properties文件,一般写本机地址
host.name=192.168.11.221
修改server.properties文件,所有的消息都是需要保存到磁盘上的,这两个就是指定存储文件目录,配置的时候配置一个就行了,dirs如果有多个磁盘可以写多个文件分开存储
log.dir=/usr/local/kafka_2.12/kafka-logs log.dirs=/usr/local/kafka_2.12/kafka-logs
修改server.properties文件,broker所能接收的最大消息值,默认是1m
message.max.bytes=10302430
修改server.properties文件,新建一个topic默认的分区数,默认是1
num.partitions=5
创建kafka存储消息(log日志数据)的目录
mkdir /usr/local/kafka_2.12/kafka-log
到此为止,kafka已经配置成功,进入到kafka的bin目录,执行启动命令,启动kafka
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
```makefile
2.4 搭建kafka manager可视化控制台
## 这个在一台服务器上安装就行,因为集群信息已经互通,假设
192.168.11.222
## 把jar包(kafka manager)上传到 服务器
/home/software/kafka/kafka-manager-2.0.0.2.zip
## 解压到指定目录
unzip kafka-manager-2.0.0.2.zip -d /usr/local/
## 修改配置文件
vim /usr/local/kafka-manager-2.0.0.2/conf/application.conf
## 配置zk的地址
kafka-manager.zkhosts="192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181"
## 启动
/usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &
## 6.4 浏览器访问控制台:默认端口号是9000
http://192.168.11.222:9000/
## 6.5 添加Cluster集群
## 7 集群验证:
## 7.1 通过控制台创建了一个topic为"test" 2个分区 1个副本
## 7.2 消费发送与接收验证
cd /usr/local/kafka_2.12/bin
## 启动发送消息的脚本
## --broker-list 192.168.11.221 指的是kafka broker的地址列表
## --topic test 指的是把消息发送到test主题
kafka-console-producer.sh --broker-list 192.168.11.221:2181 --topic test
## 启动接收消息的脚本
kafka-console-consumer.sh --bootstrap-server 192.168.11.221:9092 --topic test
3. 核心概念讲解
3.1 集群架构解析
上图是
kafka
最基本的全局上的一个架构模型上图是一个
kafka broker
的详情图- 对比
Rabbit MQ
的个人理解topic
就是exchange
,partition
就是queue
- 生产者把消息发送到
topicA
中某个partition
,消费者订阅topicA
从而拉取其中的消息进行消费
- 主题
topic
- 是一个逻辑上的概念不是一个物理存储的概念,这样一解释更像exchange
- 分区
partition
- 起始位置的
offset
为0
- 暂时定位一个物理存储概念
- 一个
topic
对应多个partition
,每个分区里面包含不同的消息 - 在存储的结构上可以理解为一个可以追加日志的log文件
topic
如何将消息投递到对应的partitionpartition
策略,例如hash
策略
- 起始位置的
- 偏移量(
offset
)- 偏移量,就是上图的
0,1,2,3.......
- 在消息的
partition
中是一个唯一的标识 kafka
通过offset
保证消息在partition
内是顺序的
- 偏移量,就是上图的
- 消息的填充过程(发送消息)
- 消息都会被一个一个填充到我们的
partition
里面,也可以理解为追加到一个一个日志文件里面 - 这时候都会分配一个偏移量
offset
,对于这个offset
它是一个顺序写入的过程0,1,2...
- 消息都会被一个一个填充到我们的
- 消息的找到过程(拉取消息)
- 订阅
topic
,在确定是哪个partition
- 在消息的
partition
中offset
是一个唯一的标识,通过offset
找到对应的消息
- 订阅
- 如何保证消息的有序
- 我们消费的时候,一个消费者、一个线程、一个进程只连接一个分区
其它的相关逻辑
broker1
、broker2
:指的是一个一个物理机器集群borker1-p1
和broker2-p1
:相互的数据是完全一样的,防止某台机器挂掉了,数据仍然保证完整性(类似于es
的副本分片)- 同一时刻分片数据不一致,因为数据同步需要时间(虽然很短)
- 绿色的是
leader
分片会对外提供读写功能,紫色的follower
分区,会不断的向leader
拉取数据,只有当leader
故障了,follower
才会对外提供服务 总结:空间(存储空间)换取数据的高可靠性:不管是
kafka
还是es
,只要是分布式的架构,都会使用这种方式来保证数据高可靠性,是一个常规的解决方式3.4
Hw
、LEO
High Watermark
:消费者最多拉取到高水位线的消息(拉取 <hw
,注意不能等于)Log End Offset
:日志文件最后一条记录的offset
,例如:一个partition
有100
条数据,最后一条的偏移量就是99
,这个99
就是LEO
,也就是说leo = offset
3.5 同步副本机制
ISR
ISR
机制保证主从分区的数据一致性- 两个时间概念
- 拉取时间:
follower
从leader
拉取数据花费的时间 - 容忍时间:
follower
从leader
拉取数据需要一定的时间,kafka
可以设置一个容忍时间
- 拉取时间:
- 上图的流程机制
p1
收到写入请求 -s1 s2
会从p1
拉取数据- 如果拉取时间 < 容忍时间,则副本会进入到
ISR
集合 - 否则会进入
out sync replicas OSR
集合,这时候leader
已经放弃s2
这个副本 - 后续看图二,如果
s2
追上进度,就会被从新拉进ISR
集合
leader
挂掉的后续拉取策略(注意是默认策略)
- 图一:当前一段时间没有数据写入
hw = leo
- 图二:
maste
写入消息后,slave
正在拉取数据,这时候可以看出因为其它副本没有同步完成数据,那么Hw
就只能等于原来的最大数据量 - 图三:只有当所有的数据在每个分片上同步完成,
hw
才会增加
- 图一:当前一段时间没有数据写入
拉取策略是可以设置的(默认是全部副本)
kafka
的所有Broker
都会注册到kafka
集群中去,kafka
集群会选举一个Broker
作为leader
作为kafka
七群的总控制器controller,负责管理整个集群所有分区partition
和副本follower
的状态leader和follower数据同步
如果
leader
写入成功follwer
还没写入时,leader
就挂掉了,此时重新选举leader
,新的leader
自然没有这条信息。这种情况下,如果原有leader
恢复正常,成为follwer节点,会把之前写入的那条信息同步给新leader
4. 生产者/消费者快速入门
Tips:结合代码 + 笔记 + 源码学习生产者和消费者的内容
4.1 生产者
4.1.1 简单流程
KafkaProducer
是线程安全的,KafkaConsumer
不是线程安全的-
(2)
ProducerRecord<K, V>
public class ProducerRecord<K, V> {
private final String topic; // 对应的主题
private final Integer partition; // 分区的数量
private final Headers headers; // 自定义的一些属性
private final K k; // 在关键参数中设置的,可以用来判断进入哪个分区
private final V v; // 具体的消息内容
private final Long timestamp; // 消息具体发送的时间
}
5.1.3 消息重试机制
(1) acks
- 含义:发送消息后,
broker
端至少多少个副本接受到消息(这里的副本包含leader``follower
)才给生产端回应消息 acks = 1
:默认情况,leader
副本收到消息,就给生产端回应成功acks = 0
:生产者发送完成消息后,不需要收到任何消息(不需要任何可靠性消息投递保证)acks = -1、acks = a
:生产者在消息发送之后,需要ISR
中所有副本都成功写入消息之后,才能收到来之broker
端的相应()。
-
(3)
retries
、retry.backoff.msretries
-
(4)
compression.type
指定消息的压缩方式,默认值为
none
,可选择gizp``snappy``lz4
(5)
connections.max.idle.ms
-
(6)
linger.ms、batch.size、buffer.memory
- 这三个是调优相关的
linger.ms
:例如设置100
毫秒(100
是经验设置,要记住),这100
毫秒之内的消息都是在内存中的,等到100
毫秒的时候一起发送;也可以成为延迟发送、批量发送batch.size
:累计多少条消息,则进行一次批量发送buffer.memory
:缓存提升性能参数,默认32M
- 三个参数的配合情况
- 前两条的存储的信息都是
buffer.memory
中 - 背景:
linger.ms = 100ms
,batch.size = 100
条100ms
到了,累计了30
条消息,也是要发送一次- 刚到
50ms
,累计了100
条,也要发送一次 - 结论:只要满足一条就发送
- 前两条的存储的信息都是
4.1.4 拦截器
Tips:生产者和消费者都有拦截器
- 看生产者源码就知道在发送消息
(doSend)
之前有一个拦截器,我们就是需要实现这个拦截器 - 实现这个
org.apache.kafka.clients.producer.ProducerInterceptor
接口,具体看项目代码 配置实现的拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName())
4.1.5 序列化问题
broker
只会接收二进制数据,所以消息的发送和接收需要做序列化和反序列化一些基本的、简单的数据类型
kafka
已经提供了序列化和反序列化机制,所以前面所有的demo
演示中,我们将对象转换为JSON
数据其实也就是字符串,kafka
自带的序列化工具序列化字符串- 特殊对象,例如自定义的对象,需要我们自己实现序列化/反序列化机制
- 具体实现查看代码即可(
注意:直接参考已经实现的StringSerializer
) 理解之后其实通过转
JSON
然后使用默认String
序列化工具也是ok
的4.1.6 分区器
作用:就是对消息进行分区的,消息到底是落到那个分区他来决定
- 如果消息的
partition
属性带有值,则按照指定的分区,没有的话需要通过分区器来进行分区 - 有一个默认的分区器
DefaultPartitioner
- 自定义分区器,只需要实现
Partitioner
接口即可,其中的partition
方法的参数介绍- @param topic 主题名称
- @param key 要分区的键(如果没有键,则为 null)
- @param keyBytes 序列化的要分区的键(如果没有键,则为 null)
- @param value 要分区的值或 null
- @param valueBytes 序列化要分区的值或 null
- @param cluster 当前集群元数据
- 实际工作中分区器如何使用
- 例如想要不同的消息进入不同的分区
- 例如商品topic,有4个分区(鞋子、裤子、上衣、内衣),根据@param value或者其他任意参数,让他们进入属于自己的分区
- 例如我们理算接收不同的调用方,核心进入a分区,中台进入b分区….
- 顺序消费问题
- 按照a例子属于自己的进入自己的分区,一个消费者只能消费一个分区,实现顺序消费
- 问题:消费者如何只接受某一个分区的消息?可以在拦截器进行拦截
- 例如想要不同的消息进入不同的分区
4.1.7 生产者发消息到broker
的流程
- ProducerRecord把信息包装成消息,由KafkaProducer.send()发送出去
- 经过拦截器也就是
onSend()
方法,拦截器可能自由的做一些业务 - serializer、partition
- 经过序列化,key、value都需要序列化
- 有key通过key进行hash计算出一个分区,没有指定key也会通过算法计算出分区
- 如果带有partition的属性值,就需要走分区器进行分区
- 如果指定的partition的值,则按照指定的来
4.2 消费者流程
4.2.1 简单流程
- 配置消费者启动关键参数
- 创建
kafka
消费者对象 - 订阅相关主题
- 拉取消息,并进行消费处理
-
4.2.2 消费者、消费者组的概念
几个消费者联合起来就是消费者组
- 一个消费者组内的消费者同时订阅一个
topic
- 消费者的数量 = 分区的数量
- 一个分区只能被一个消费组中的一个消费者消费,例如:不能说
co
、c1
同时消费p0
(这时kafka
的规定)
- 一个分区只能被一个消费组中的一个消费者消费,例如:不能说
- 消费者的数量 > 分区的数量
- 例如有
4
个分区,但是一个消费者组有5个消费者,那么安装规定其中一个消费者什么都不敢,因为如果干了就违反一对一的消费
- 例如有
- 消费者的数量 < 分区的数量
- 比如
2
个消费者,4
个分区,那么可能1个消费者分到3
个分区,一个分到1
个分区,总之是把分区分配完毕,不能违反规定
- 比如
- 消费者的数量 = 分区的数量
- 不同消费组的消费者订阅同一
topic
,相当于广播模式 - 一个消费者最内的消费者都是点对点的意思
- 一个消费者组看成一个整体,然后不同的消费者组就是发布订阅模型
4.2.3 消费者重要参数
Tips:其它参数,查看项目代码
(1) assign、subscribe
assign
:只订阅主题下的一个、多个分区subscribe
:订阅主题,支持支持集合、正则表达式 ```java // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE)); // 也可以支持正则表达式方式的订阅 consumer.subscribe(Pattern.compile(“topic-.*”));
// 可以指定订阅某个主题下的某一个、多个 partition TopicPartition tp1 = new TopicPartition(Const.TOPIC_CORE, 0); TopicPartition tp2 = new TopicPartition(Const.TOPIC_CORE, 2); consumer.assign(Arrays.asList(tp1, tp2);
// 如何拉取主题下的所有partition,然后订阅相关的partition
List
(2) auto_offset_reset_conifg
- �
AUTO_OFFSET_RESET_CONFIG
有三种方式:"latest", "earliest", "none"
none
:不设置,存储在kafka的本地文件的信息latest
:默认,从一个分区的最后提交的offset开始拉取消息earliest
:从最开始的起始位置拉取消息 0properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");
(3)
fetch.min.bytes
-
(4)
fetch.max.bytes
-
(5)
max.partition.fetch.bytes
一次
fetch
请求,从分区中取取得的record
最大值,默认1M
(6)
fetch.max.wait.ms
fetch
请求发送给broker
之后,在broker
中可能被阻塞的时长,默认500ms
(7)
max.pull.records
consumer
每次调用pull
时候,最大拉取的records
数量,默认500
条4.2.4 消费者的手工提交
- 设置这个参数:
ConsumerConfig.enable.auto.commit.config = false
- 提交方式:
- 消息级别:同步提交、异步提交
- 分区级别:同步提交、异步提交
- 整体(代表一次拉取的所有消息):整体提交、分区提交
offset
不提交会有什么现象?KafkaProducer
是线程安全的,KafkaConsumer
不是线程安全的KafkaConsumer
中定义了一个acquire()
方法用来检测是否只有一个线程在操作,如果有其它线程操作则会抛出ConcurrentModificationException(并发修改异常)
KafkaConsumer
在执行所有的动作的时候都会先执行acquire()方法检测是否线程安全思考:消费者线程不安全,那么单线程如何解决消息堆积问题呢? 解析:实现消费者的多线程,这个也是解决消息堆积的一种方式:生产者消息发送太快了,消费者又是单线程 的可能会造成消息堆积�
既然消费者是单线程的,那么消费者如何实现多线程呢?
借用消费者组内的规则:一个partition只能被一个Consumer消费,那我就把partition和consumer的数量对其