1. 下载安装
需要下载zookeeper 下载安装教程参考
windows kafka安装教程
# 安装zookeeperdocker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper#安装kafka#KAFKA_BROKER_ID kafka实例的id,必须唯一#KAFKA_ZOOKEEPER_CONNECT zookeeper连接地址,使用docker安装zooker这个地址可以使用#docker inspect --format='{{.NetworkSettings.IPAddress}}' [容器名称|容器id]查看虚拟ip#KAFKA_LISTENERS 监听器端口 公网上使用0.0.0.0#KAFKA_ADVERTISED_LISTENERS 将kafka注册到zookeeper使用的地址,124.220.22.12是kafka运行的本机的ipdocker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.220.22.12:9092 -t wurstmeister/kafka
2. 命令记录
—zookeeper localhost:2181也可以换成—bootstrap-server localhost:9092
#启动服务器.\bin\windows\kafka-server-start.bat .\config\server.properties
#创建topic --replication-factor 每个分区副本数,一般来说等于broker个数,不能超过broker个数 --partitions分区个数.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test#查看topic列表.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092#查看指定topic信息.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test3#增加topic分区数 只能增加不能减少,不能修改副本数.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic test3 --partitions 2#获取topic偏移量,ttime=-1 最大的偏移量,time=-2最小的偏移量,相减可以获得消息总数.\bin\windows\kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic event-post-send-666666 --time -1#修改topic配置.\bin\windows\kafka-configs.bat --bootstrap-server 192.168.1.207:9092 --entity-type topics --entity-name property-post-send-6666 --alter --add-config max.message.bytes=10485760#查看修改过的配置.\bin\windows\kafka-configs.bat --bootstrap-server 192.168.1.207:9092 --entity-type topics --entity-name property-post-send-6666 --describe
#启动消费者 Producer程序里指定topic,如果topic在broker里不存在,Producer将会通过读取server.properties文件中num.partitions=1和default.replication.factor=1创建它。.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic ywqtest --from-beginning
#启动生产者.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic ywqtest
#查看所有group消息情况.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --all-groups#查看指定group消费情况.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group defaultGroupId#删除消费者组.\bin\windows\kafka-consumer-groups.bat --bootstrap-server 47.97.199.47:9092 --delete --group xiot_data#查看所有消费者组.\bin\windows\kafka-consumer-groups.bat --bootstrap-server 47.97.199.47:9092 --list
3.相关博客参考
Kafka之分区和副本详解
KAFKA 常用命令解析
详细解析kafka之kafka分区和副本
kafka副本归纳总结
kafka分区副本及消息存储原理
spring.kafka.listener.concurrency 的使用
spring-kafka之KafkaListener注解深入解读
Kafka高吞吐量的原因
4. 删除topic
- 将对这个分区的生产者和消费者进行停止(可选,设置 auto.create.topics.enable = false,默认设置为true。如果设置为true,则produce或者fetch 不存在的topic也会自动创建这个topic)
- 配置文件server.properties设置 delete.topic.enable=true
- 调用删除命令
```bash
删除命令
.\bin\windows\kafka-topics.bat —delete —bootstrap-server localhost:9092 —topic ywqtest Topic ywqtest is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.据说还有这个删除命令,只会删除zookeeper中的元数据,消息文件须手动删除
.bin\windows\kafka-run-class.bat kafka.admin.DeleteTopicCommand —zookeeper localhost:2181 —topic t_cdr
再次查询会进行提示 MarkedForDeletion: true
.\bin\windows\kafka-topics.bat —bootstrap-server localhost:9092 —describe —topic ywqtest Topic: ywqtest TopicId: P6S8qtUITsyqIhdl6yXGQg PartitionCount: 1 ReplicationFactor: 1 Configs: MarkedForDeletion: true Topic: ywqtest Partition: 0 Leader: none Replicas: 0 Isr: 0 MarkedForDeletion: true
4. 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。注意:如果kafka 有多个 broker,且每个broker 配置了多个数据盘(比如 /data/kafka-logs,/data1/kafka-logs ...),且topic也有多个分区和replica,则需要对所有broker的所有数据盘进行扫描,删除该topic的所有分区数据。<br />**一般而言,经过上面4步就可以正常删除掉topic和topic的数据。但是,如果经过上面四步,还是无法正常删除topic,则需要对kafka在zookeeer的存储信息进行删除。**具体操作如下:5. bin/zkCli.sh -server 【zookeeper server:port】登录到zk shell,然后找到topic所在的目录:ls /brokers/topics,找到要删除的topic,然后执行命令:```bash#老版本使用rmr进行删除rmr /brokers/topics/【topic name】#新版本zookeeper使用deleteall进行删除deleteall /brokers/topics/【topic name】
- 完成之后,调用命令:
查看现在kafka的topic信息。正常情况下删除的topic就不会再显示。但是,如果还能够查询到删除的topic,则重启zk和kafka即可。.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
5. kafka启动报错解决
代码提示获取”ywqtest” topic的消息错误,则进入zk的shell通过命令行deleteall /brokers/topic/xxx删除指定的topic数据,再重启就可以了[2022-03-01 09:11:37,895] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(ywqtest-0) (kafka.server.ReplicaAlterLogDirsManager)[2022-03-01 09:11:37,900] ERROR Shutdown broker because all log dirs in D:\Kafka\kafka_2.12-2.8.0\logs have failed (kafka.log.LogManager)
6. 笔记归纳
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
Topic:一类消息,Kafka集群能够同时负责多个topic的分发。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
Segment:partition物理上由多个segment组成。
offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。
AR:Kafka0.8版本后加入副本机制,每个Partition可能有多个备份,某个Partition的Replica列表叫作AR(Assigned Replicas)
ISR: 每个分区的 leader 会维护一个 in-sync replica(同步副本列表,又称 ISR)。当 producer 往 broker 发送消息,消息先写入到对应 leader 分区上,然后复制到这个分区的所有副本中。只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。由于消息复制延迟受到最慢同步副本的限制,因此快速检测慢副本并将其从 ISR 中删除非常重要。 Kafka 复制协议的细节会有些细微差别。副本机制
- 在kafka中,副本分成两类:领导者副本和追随者副本。每个分区在创建时都要选举一个副本,成为领导者副本,其余的副本自动称为追随者副本。
- kafka中,追随者副本是不会对外提供服务的,所有的请求都必须由领导者副本来处理。它唯一的任务就是从领导者副本异步拉去消息,并写入到自己提交日志中,从而实现与领导者副本的同步。
- 当领导者副本挂掉了,或者说所在Broker宕机了,kafka可以通过Zookeeper提供的监控功能能够实时感知到,并开启新一轮领导者选举,从追随者副本中选一个作为新的领导者。老Leader副本重启回来后,只能作为追随者副本加入到集群中。
好处
- 方便实现“Read-your-writes”
顾名思义,就是当你使用生产者api向kafka成功写入消息后,就马上使用消费者api去读取刚才的消息。
举个例子,就是你刚发完一条微博,肯定是希望立马能够看到的。这就是Read-your-writes场景了。如果追随者副本对外提供服务的话,由于副本同步是异步的,因此有可能发生追随者副本还没有及时从领导者副本中拉取最新消息,从而使客户端看不到最新的消息。
- 方便实现单调读
什么是单调读。单调读就是消费者在多次读消息时候,不会看到一条消息一会儿存在一会儿不存在。
例如:如果允许追随者副本提供读服务,那么假设当前有两个追随者副本F1,F2。生产者往领导者中发送了消息后,F1,F2开始异步拉取消息。若F1拉取成功了,而F2还未拉取成功。此时消费者第一次消费F1副本获取最新消息,第二次消费的时候消费到了F2副本。就获取不到该条消息了。这就不是单调读一致性。所以都由Leader副本来处理请求的话,就能实现单调读。
ISR
首先需要明确一点。ISR不只是追随者副本集合,它必然包括Leader副本。甚至在某些情况下,ISR只有Leader这一个副本。
根据Broker端参数replica.lag.time.max.ms参数值判断是否可以加入到ISR集合里面。这个参数的含义就是Follower副本能够落后Leader副本的最长时间间隔,当前默认值是10秒。这就是说,
只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
若是同步过程的速度持续慢于Leadr副本的写入速度,那么在replica.lag.time.max.ms时间后,kafka就会自动收缩ISR集合,将改副本提出集合。
值得注意的是,若改副本后面慢慢追上了Leader的进度。那么它是可以被重新放入ISR集合中的。这也表明ISR是一个动态调整的集合,而非静态不变的
Unclean 领导者选举
既然ISR可以动态调整,那么就会出现ISR为空的情况。ISR为空的情况就代表Leader副本也挂掉了。那么kafka就需要重新选举新的Leader。
那么该怎么选举Leader呢?
- kafka把所有不在ISR的存活副本都成为非同步副本。
- 通常来说,非同步副本落后Leader太多,因此,如果选择这些副本为新的Leader,就可能出现数据的丢失。在kafka,选举Leader这种过程被成为Unclean。由Broker端参数unclean.leader.election.enable控制是否允许Unclean领导者选举。
- 开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过并不建议开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
producer的acks参数
acks这个配置可以指定三个值,分别是0,1和-1。我们分别来说三者代表什么:
- acks为0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最快
- acks为1:为1意味着producer发送数据后,需要等待leader副本确认接收后,才会发送下一条数据,性能中等
- acks为-1:这个代表的是all,意味着发送的消息写入所有的ISR集合中的副本(注意不是全部副本)后,才会发送下一条数据,性能最慢,但可靠性最强
还有一点值得一提,kafka有一个配置参数,min.insync.replicas,默认是1(也就是只有leader,实际生产应该调高),该属性规定了最小的ISR数。这意味着当acks为-1(即all)的时候,这个参数规定了必须写入的ISR集中的副本数,如果没达到,那么producer会产生异常
Kafka heartbeat.interval.ms 与 session.timeout.ms
- 在kafka0.10.1之后的版本中,将session.timeout.ms 和 max.poll.interval.ms 解耦了。**也就是说:new KafkaConsumer对象后,在while true循环中执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个kafka consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程,processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是”隐藏不见”的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.ms向coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms 时间内 向 coordinator发送过心跳包,那么 group coordinator就认为当前的kafka consumer是活着的。
- 在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的,**试想:如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 kafka consumer处理完消息,group coordinator早就将consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向group coordinator发送心跳包,超过3000ms未发送心跳包,group coordinator就将该consumer移出group了。而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么:就算一条消息需要处理5min,只要底heartbeat线程在session.timeout.ms向group coordinator发送了心跳包,那consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果consumer出了问题,那么在 session.timeout.ms内就能检测出来,而不用等到 max.poll.interval.ms 时长后才能检测出来。
7. 生产者与分区
默认的分区策略是:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
8. 分组消息
一个partition中的消息只能被组中的一个消费者实例消费
- 主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区,否则将意味着某些consumer将无法得到消息。
- 消费者数据小于Partions的数量时:一个 consumer可以消费多个partitions中的消息
9. 重平衡
触发重平衡的情况如下:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
- 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
消费者退出常见情况:
- 当消费者两次poll()的时间超过max.poll.interval.ms消费者会退出触发重平衡 ``` max.poll.interval.ms The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.
Type: int Default: 300000 (5 minutes) Valid Values: [1,…] Importance: medium
2. 在规定时间内 **session.timout.ms**,未收到来自消费者的心跳包,也会被判定消费者退出。消费者发送心跳包的时间配置 **heartbeat.interval.ms。**<a name="cxg6i"></a># 10.java操作kafka1. 踩坑注意事项:不管是使用注解@KafkaListener,还是手动创建KafkaConsumer。在订阅topic的时候,<br />使用注解:topic和topicPattern和_topicPartitions_是互斥的,不得同时使用<br />手动创建KafkaConsumer,对于同一个KafkaConsumer的引用两次调用subscribe,需调用同一个方法。```java//方法一@Overridepublic void subscribe(Pattern pattern) {subscribe(pattern, new NoOpConsumerRebalanceListener());}//方法二@Overridepublic void subscribe(Collection<String> topics) {subscribe(topics, new NoOpConsumerRebalanceListener());}
如图所示:两次调用必须都是用Pattern入参或者Collection入参,两次使用不一致的话,程序会卡住,很难找到原因。经测试发现是两个方法互斥,导致程序卡住。注意!!!!!
