常用操作

启动

  1. # 以守护进程的方式
  2. bin/kafka-server-start.sh --daemon config/server.properties # 启动
  3. bin/kafka-server-stop.sh config/server.properties # 停止

日志查看

  1. cd ~/modules/kafka_2.11-0.11.0.3/logs
  2. tail -f -n200 server.log

索引查看

  1. # Kafka-2.x版本支持该命令
  2. bin/kafka-dump-log.sh --flies --file ~/modules/kafka_2.11-0.11.0.3/tmp/kafka-logs/test-0/00000000000000000000.log

分区操作

  1. # 增加partion数量,示例:从10个partition增加到20个
  2. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic demo --partitions 20

注意:**减少partition是不允许的。**

Topic信息查看

  1. cd ~/modules/kafka_2.11-0.11.0.3/
  2. bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --list
  3. bin/kafka-topics.sh --describe --topic test --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181

Topic信息更新

  1. cd ~/modules/kafka_2.11-0.11.0.3/
  2. # 修改副本数
  3. bin/kafka-topics.sh --alter --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic test --partitions 3

消费组管理

查看消费组

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费组详情

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupName

查看消费组状态

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupName --state

消费组内成员信息

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupName --members

删除消费组

  1. # 如果有消费者在使用则会失败
  2. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupName

Offsets管理

  1. # 重置消费位移,前提是没消费者在消费
  2. bin/kafak-consumer-groups.sh --bootstrap-server localhost:9092 --group groupName --all-topics --reset-offsets --to-earliest --execute

参数说明:

  • –all-topics:指定了所有主题,修改为“–topics”,指定单个主题。

    Kafka压力测试

    用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

    Kafka Producer压力测试

    (1)启动生产者压测脚本。
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092
    说明:record-size是一条信息有多大,单位是字节;num-records是总共发送多少条信息;throughput 是每秒多少条信息。
    (2)Kafka会打印下面的信息。
    1. 100000 records sent, 999.510240 records/sec (0.10 MB/sec), 41.14 ms avg latency, 462.00 ms max latency, 36 ms 50th, 81 ms 95th, 129 ms 99th, 215 ms 99.9th.
    参数解析:本例中一共写入10w条消息,每秒向Kafka写入了0.10MB的数据,平均是999条消息/秒,每次写入的平均延迟为41.14毫秒,最大的延迟为462毫秒。

    Kafka Consumer压力测试

    Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。
    (1)启动消费者压测脚本。
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. bin/kafka-consumer-perf-test.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
    说明:zookeeper指定zk的链接信息;topic指定topic的名称;fetch-size指定每次fetch的数据的大小;messages总共要消费的消息个数。
    (2)Kafka会打印下面的信息。
    1. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2. 2020-05-15 15:29:06:782, 2020-05-15 15:29:14:095, 9.5391, 1.3044, 100622, 13759.3327
    参数解析:开始测试时间,测试结束数据,最大吞吐率9.5391MB/s,平均每秒消费1.3044MB/s,最大每秒消费100622条,平均每秒消费13759.3327条。

    Kafka机器数量计算

    Kafka机器数量:
    CentOS7.6-Kafka使用手册 - 图1
    先预估一天大概产生多少数据,然后用Kafka自带的生产压测(只测试Kafka的写入速度,保证数据不积压),计算出峰值生产速度。再根据设定的副本数,就能预估出需要部署Kafka的数量。
    比如我们采用压力测试测出写入的速度是10M/s一台,峰值的业务数据的速度是50M/s。副本数为2。
    CentOS7.6-Kafka使用手册 - 图2

    Kafka集群监控

    生产环境中的Kafka,需要从CPU, Memory, 磁盘,Kafka自身的Metrics等多方面进行监控。这样才能在出现问题的时候,做到精确定位,及时响应。

    监控方式

    (1)命令行工具监控
    使用kafka命令行工具kafka-consumer-groups。
    1. cd ~/modules/kafka_2.11-0.11.0.3/
    2. # 0.9之前版本
    3. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --group test-g --topic test
    4. # 新版本使用kafka-consumer-groups
    5. # 查看消费组列表
    6. bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --list group
    7. # 偏移量描述
    8. bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --describe --group test-g
    (2)Kafka API编程监控
    使用kafka Java Consumer API编程。
    CentOS7.6-Kafka使用手册 - 图3
    参考:https://blog.csdn.net/weixin_40861707/article/details/103875884
    (3)Kafka JXM监控
    使用Kafka自带的JMX监控指标。

    JMX监控

    CentOS7.6-Kafka使用手册 - 图4
    6.png

    KafkaOffsetMonitor

    更偏向于对kafka运行情况的监控。
    1. # 从github下载:https://github.com/quantifind/KafkaOffsetMonitor/releases
    2. nohup java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --port 11010 --refresh 5.seconds --retain 1.days &

    Kafka Manager

    该监控工具更偏向于对kafka集群的管理,也有监控。
    1. # 从github下载:https://github.com/yahoo/kafka-manager/releases
    2. # 使用sbt编译(需要联网);生成kafka-manager-1.3.3.22.zip;
    3. unzip kafka-manager-1.3.3.22.zip
    4. nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=11020 &

    Kafka Eagle

    Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息。参考:《CentOS7.6-安装Kafka Eagle-1.4.7
    e.png

    Kafka监控指标

    | 指标类型 | 配置 | 描述 | | —- | —- | —- | | 全局指标 | “obj” : “kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec” “attr” : [ “Count” ] “resultAlias”:”BytesInPerSec” “tags” : {“application” : “BytesInPerSec”} | 每秒输入的流量 | | 全局指标 | “obj” : “kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec” “attr” : [ “Count” ] “resultAlias”:”BytesOutPerSec” “tags” : {“application” : “BytesOutPerSec”} | 每秒输出的流量 | | 全局指标 | “obj” : “kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec” “attr” : [ “Count” ] “resultAlias”:”BytesRejectedPerSec” “tags” : {“application” : “BytesRejectedPerSec”} | 每秒拒绝的流量 | | 全局指标 | “obj” : “kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec” “attr” : [ “Count” ] “resultAlias”:”MessagesInPerSec” “tags” : {“application” : “MessagesInPerSec”} | 每秒的消息写入总量 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower” “attr” : [ “Count” ] “resultAlias”:”RequestsPerSec” “tags” : {“request” : “FetchFollower”} | 每秒FetchFollower的请求次数 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer” “attr” : [ “Count” ] “resultAlias”:”RequestsPerSec” “tags” : {“request” : “FetchConsumer”} | 每秒FetchConsumer的请求次数 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce” “attr” : [ “Count” ] “resultAlias”:”RequestsPerSec” “tags” : {“request” : “Produce”} | 每秒Produce的请求次数 | | 全局指标 | “obj” : “java.lang:type=Memory” “attr” : [ “HeapMemoryUsage”, “NonHeapMemoryUsage” ] “resultAlias”:”MemoryUsage” “tags” : {“application” : “MemoryUsage”} | 内存使用的使用情况 | | 全局指标 | “obj” : “java.lang:type=GarbageCollector,name=“ “attr” : [ “CollectionCount”,”CollectionTime” ] “resultAlias”:”GC” “tags” : {“application” : “GC”} | GC的耗时和次数 | | 全局指标 | “obj” : “java.lang:type=Threading” “attr” : [ “PeakThreadCount”,”ThreadCount” ] “resultAlias”:”Thread” “tags” : {“application” : “Thread”} | 线程的使用情况 | | 全局指标 | “obj” : “kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica” “attr” : [ “Value” ] “resultAlias”:”ReplicaFetcherManager” “tags” : {“application” : “MaxLag”} | 副本落后主分片的最大消息数量 | | 全局指标 | “obj” : “kafka.server:type=ReplicaManager,name=PartitionCount” “attr” : [ “Value” ] “resultAlias”:”ReplicaManager” “tags” : {“application” : “PartitionCount”} | 该broker上的partition的数量 | | 全局指标 | “obj” : “kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions” “attr” : [ “Value” ] “resultAlias”:”ReplicaManager” “tags” : {“application” : “UnderReplicatedPartitions”} | 正在做复制的partition的数量 | | 全局指标 | “obj” : “kafka.server:type=ReplicaManager,name=LeaderCount” “attr” : [ “Value” ] “resultAlias”:”ReplicaManager” “tags” : {“application” : “LeaderCount”} | Leader的replica的数量 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer” “attr” : [ “Count”,”Max” ] “resultAlias”:”TotalTimeMs” “tags” : {“application” : “FetchConsumer”} | 请求FetchConsumer耗费的所有时间 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower” “attr” : [ “Count”,”Max” ] “resultAlias”:”TotalTimeMs” “tags” : {“application” : “FetchFollower”} | 请求FetchFollower耗费的所有时间 | | 全局指标 | “obj” : “kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce” “attr” : [ “Count”,”Max” ] “resultAlias”:”TotalTimeMs” “tags” : {“application” : “Produce”} | 请求Produce耗费的所有时间 | | Topic指标 | “kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=falcon_monitor_us” “attr” : [ “Count” ] “resultAlias”:”falcon_monitor_us” “tags” : {“application” : “BytesInPerSec”} | 每秒的写入流量 | | Topic指标 | “kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=falcon_monitor_us” “attr” : [ “Count” ] “resultAlias”:”falcon_monitor_us” “tags” : {“application” : “BytesOutPerSec”} | 每秒的输出流量 | | Topic指标 | “obj” : “kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=falcon_monitor_us” “attr” : [ “Count” ] “resultAlias”:”falcon_monitor_us” “tags” : {“application” : “MessagesInPerSec”} | 每秒写入消息的数量 | | Topic指标 | “obj” : “kafka.log:type=Log,name=LogEndOffset,topic=falcon_monitor_us,partition=“ “attr” : [ “Value” ] “resultAlias”:”falcon_monitor_us” “tags” : {“application” : “LogEndOffset”} | 每个分区最后的Offset |

参数说明

  • obj:对应jmx的ObjectName,就是我们要监控的指标。
  • attr:对应ObjectName的属性,可以理解为我们要监控的指标的值。
  • resultAlias:对应metric 的名称,在InfluxDb里面就是MEASUREMENTS名。
  • tags:对应InfluxDb的tag功能,对与存储在同一个MEASUREMENTS里面的不同监控指标可以做区分,我们在用Grafana绘图的时候会用到,建议对每个监控指标都打上tags。

对于全局监控,每一个监控指标对应一个MEASUREMENTS,所有的kafka节点同一个监控指标数据写同一个MEASUREMENTS ,对于topc监控的监控指标,同一个topic所有kafka节点写到同一个MEASUREMENTS,并且以topic名称命名
Kafka更详细JMX监控指标请参考:http://kafka.apache.org/documentation/#monitoring

Kafka集群迁移扩容

https://docs.qq.com/doc/DYWZxR2J4RGRFV3Rn

问题处理

  • Kafka如何处理大量积压消息?

场景:原来topicA 有3个partition
思路:扩展机器数量,创建新的topicB ,设定10个partition,之前A的消费者逻辑改为获取到topicA的消息之后发topicB,然后新的10台机器来处理topicB的消息,这样效率是以前的3倍。
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。