date: 2021-01-07title: kafka集群数据丢失 #标题
tags: kafka #标签
categories: MQ # 分类
问题背景
三台kafka组成的集群,监控到topic有大量消息积压,故要到线上调整优化logstash配置,以便增大消息消费能力,但尴尬的事情是,到线上发现,三个logstash已经挂掉了,再次启动,卧槽,起不来了,这里附上一段logstash启动报错日志:
[2021-01-06T20:22:08,158][ERROR][logstash.javapipeline ][feature-log] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:feature-log
Plugin: <LogStash::Inputs::Kafka auto_offset_reset=>"latest", group_id=>"logstash-feature-log", topics=>["risk-feature-log"], max_poll_records=>"10", id=>"8080e0dec80a5feb37b74592b144743127af23a530367b44866b47e999418e32", bootstrap_servers=>"xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092", client_id=>"feature-log-no01", decorate_events=>true, enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_55d1cbc1-3a64-45e0-aa95-20e0c4bfc7d4", enable_metric=>true, charset=>"UTF-8">, auto_commit_interval_ms=>"5000", consumer_threads=>1, enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
Error: Unexpected error code 2 while fetching from partition risk-feature-log-28
Exception: Java::JavaLang::IllegalStateException
Stack: org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(org/apache/kafka/clients/consumer/internals/Fetcher.java:1232)
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:587)
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(org/apache/kafka/clients/consumer/KafkaConsumer.java:1263)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:1225)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:1159)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:497)
org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:455)
org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:316)
data.elk.logstash_minus_7_dot_5_dot_0.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/data/elk/logstash-7.5.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:255)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:295)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:274)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:270)
java.lang.Thread.run(java/lang/Thread.java:745)
[2021-01-06T20:22:08,401][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
同时,kafka某个节点的server.log
输出报错如下(tailf 实时查看所有kafka节点的server.log
才捕捉到下面两条ERROR信息,这两条信息,也是解决问题的关键所在):
问题解决
由于之前对kafka的了解较少,基本都是集群部署完成并投产后,八百年不动的,没曾想出现了数据丢失这种问题(主要原因感觉还是由于本地磁盘空间较少,所以将kafka安装在了nfs目录上,造成的数据损坏),好,回归正题,由于logstash输出信息较多,也看不懂报错是啥问题,所以还是将注意力放在了kafka输出的那两条error上,具体日志信息如下:
[2021-01-06 20:28:54,689] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition deviceRequestLog-33: (fetchOffset=124692, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/deviceRequestLog-33/00000000000000043571.log.
[2021-01-06 20:28:54,701] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition risk-feature-log-28: (fetchOffset=154671, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/risk-feature-log-28/00000000000000105950.log.
我们的logstash有五个消费组,分别消费不同的topic,既然只输出了这两个topic相关的错误信息,那么就说明只有这两个topic是有错误的,我就先将logstash配置的这两个topic文件移走,然后前台启动logstash(前台启动,方便停止,也方便观察启动输出的信息,同时实时查看各个kafka的server.log日志),好,此时logstash可以正常启动,nice,此时焦点就可以放在kafka报错的这两个topic上了,这两个topic的问题都是一样的,就是某段数据损坏,造成客户端无法正常消费,那么现在就要定位到,是topic中的哪个分区的哪段offset有问题,跳过这段即可(虽然会丢失数据,但是没办法,我们这个topic创建时,有参数指定为:—replication-factor 1:表示只有一个副本,也就是说,没有备份,这也是一个比较坑的地方,一直以来,我都以为,1个副本的话,就说明同一个数据有两份相同的,但事实证明,不是这样的)
好,废话不多说,开始排查topic哪段offset损坏了(以deviceRequestLog
这个topic为例):
# deviceRequestLog 相关错误信息如下:
[2021-01-06 20:28:54,689] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition deviceRequestLog-33: (fetchOffset=124692, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/deviceRequestLog-33/00000000000000043571.log.
# 大概可以看出是 deviceRequestLog-33损坏了
# deviceRequestLog:表示topic名称,33表示topic的第33个分区
# 查看消费组 logstash-devlog 的详细信息(消费组名称是在logstash的配置文件中定义的)
./kafka-consumer-groups.sh --bootstrap-server zk01:9092 --group logstash-devlog --describe
# 定位到PARTITION列的值为 33 这行
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash-devlog deviceRequestLog 33 124692 127367 2675 - - -
# ...... 省略部分输出
# 通过查看消费组的详细信息不难发现,就是从offset 为 124692 开始损坏的,那么现在就要找到,它到哪里开始,又是正常的呢?
# 只能通过如下指令,用命令指定某个topic的某个分区的某个offset开始消费
$ ./kafka-console-consumer.sh --bootstrap-server 10.253.155.200:9092 --topic deviceRequestLog --partition 33 --offset 125000
# 如果指定的--offset 125000 还是损坏的,则会输出如下错误信息:
[2021-01-06 23:24:38,869] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: Unexpected error code 2 while fetching from partition deviceRequestLog-33
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1236)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:591)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
# 然后一步步缩小范围,直到可以正常消费
# 我这里最后定位到从 --offset 125223 开始,才可以正常消费
# 也就是说,从 offset 124692 到 125223,这中间的数据是损坏的
$ ./kafka-console-consumer.sh --bootstrap-server zk01:9092 --topic deviceRequestLog --partition 33 --offset 125223
那么,现在定位到损坏数据有哪些了,那现在重要的一个事情就是,如何修改记录的消费组offset,以便消费组可以从指定位置开始消费呢?(不可以定义消费组从开头进行消费,这样会造成消费重复数据,并且再次消费到124692这个offset时,还会报错。也不可以定义消费组从最新开始消费,这样丢失的数据太多,所以只能让消费组从—offset 125223开始消费)。具体命令如下:
$ ./kafka-consumer-groups.sh --bootstrap-server zk01:9092 --group logstash-devlog --reset-offsets --topic deviceRequestLog:33 --to-offset 125223 --execute
# --group:指定消费组
# --reset-offsets:重置消费偏移量
# --topic:指定 topic:partition(topic:分区)
# --to-offset:指定移至哪段offset
当进行上述指令后,将logstash配置的消费deviceRequestLog这个topic的配置文件移动到当前目录下,然后启动logstash,耶,惊喜万分,启动成功。接下来如法炮制,将另一个topic也这样来一遍,齐活,问题解决。
故障复盘
经此次线上故障,总结如下几点:
- 不要将程序部署在共享目录上;
- kafka的topic副本数至少要设置2个;
- 做好监控,遇到问题不要着急,理清思路再下手;
写这篇文章我只用了四十分钟,但是解决这个问题,我用了6小时,其中的心酸,只有自己知道,不过也正是这一次踩坑,让我对kafka有了更深层次的认识。