date: 2021-01-07title: kafka集群数据丢失 #标题
tags: kafka #标签
categories: MQ # 分类

记录一次现网kafka数据损坏问题排查。

问题背景

三台kafka组成的集群,监控到topic有大量消息积压,故要到线上调整优化logstash配置,以便增大消息消费能力,但尴尬的事情是,到线上发现,三个logstash已经挂掉了,再次启动,卧槽,起不来了,这里附上一段logstash启动报错日志:

  1. [2021-01-06T20:22:08,158][ERROR][logstash.javapipeline ][feature-log] A plugin had an unrecoverable error. Will restart this plugin.
  2. Pipeline_id:feature-log
  3. Plugin: <LogStash::Inputs::Kafka auto_offset_reset=>"latest", group_id=>"logstash-feature-log", topics=>["risk-feature-log"], max_poll_records=>"10", id=>"8080e0dec80a5feb37b74592b144743127af23a530367b44866b47e999418e32", bootstrap_servers=>"xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092", client_id=>"feature-log-no01", decorate_events=>true, enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_55d1cbc1-3a64-45e0-aa95-20e0c4bfc7d4", enable_metric=>true, charset=>"UTF-8">, auto_commit_interval_ms=>"5000", consumer_threads=>1, enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
  4. Error: Unexpected error code 2 while fetching from partition risk-feature-log-28
  5. Exception: Java::JavaLang::IllegalStateException
  6. Stack: org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(org/apache/kafka/clients/consumer/internals/Fetcher.java:1232)
  7. org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:587)
  8. org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(org/apache/kafka/clients/consumer/KafkaConsumer.java:1263)
  9. org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:1225)
  10. org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:1159)
  11. java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:497)
  12. org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:455)
  13. org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:316)
  14. data.elk.logstash_minus_7_dot_5_dot_0.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/data/elk/logstash-7.5.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:255)
  15. org.jruby.RubyProc.call(org/jruby/RubyProc.java:295)
  16. org.jruby.RubyProc.call(org/jruby/RubyProc.java:274)
  17. org.jruby.RubyProc.call(org/jruby/RubyProc.java:270)
  18. java.lang.Thread.run(java/lang/Thread.java:745)
  19. [2021-01-06T20:22:08,401][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

同时,kafka某个节点的server.log输出报错如下(tailf 实时查看所有kafka节点的server.log才捕捉到下面两条ERROR信息,这两条信息,也是解决问题的关键所在):

kafka集群数据丢失 - 图1

问题解决

由于之前对kafka的了解较少,基本都是集群部署完成并投产后,八百年不动的,没曾想出现了数据丢失这种问题(主要原因感觉还是由于本地磁盘空间较少,所以将kafka安装在了nfs目录上,造成的数据损坏),好,回归正题,由于logstash输出信息较多,也看不懂报错是啥问题,所以还是将注意力放在了kafka输出的那两条error上,具体日志信息如下:

  1. [2021-01-06 20:28:54,689] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition deviceRequestLog-33: (fetchOffset=124692, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
  2. org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/deviceRequestLog-33/00000000000000043571.log.
  3. [2021-01-06 20:28:54,701] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition risk-feature-log-28: (fetchOffset=154671, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
  4. org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/risk-feature-log-28/00000000000000105950.log.

我们的logstash有五个消费组,分别消费不同的topic,既然只输出了这两个topic相关的错误信息,那么就说明只有这两个topic是有错误的,我就先将logstash配置的这两个topic文件移走,然后前台启动logstash(前台启动,方便停止,也方便观察启动输出的信息,同时实时查看各个kafka的server.log日志),好,此时logstash可以正常启动,nice,此时焦点就可以放在kafka报错的这两个topic上了,这两个topic的问题都是一样的,就是某段数据损坏,造成客户端无法正常消费,那么现在就要定位到,是topic中的哪个分区的哪段offset有问题,跳过这段即可(虽然会丢失数据,但是没办法,我们这个topic创建时,有参数指定为:—replication-factor 1:表示只有一个副本,也就是说,没有备份,这也是一个比较坑的地方,一直以来,我都以为,1个副本的话,就说明同一个数据有两份相同的,但事实证明,不是这样的)

好,废话不多说,开始排查topic哪段offset损坏了(以deviceRequestLog这个topic为例):

  1. # deviceRequestLog 相关错误信息如下:
  2. [2021-01-06 20:28:54,689] ERROR [ReplicaManager broker=1] Error processing fetch with max size 1048576 from consumer on partition deviceRequestLog-33: (fetchOffset=124692, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional[2]) (kafka.server.ReplicaManager)
  3. org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data/kafka/logs/deviceRequestLog-33/00000000000000043571.log.
  4. # 大概可以看出是 deviceRequestLog-33损坏了
  5. # deviceRequestLog:表示topic名称,33表示topic的第33个分区
  6. # 查看消费组 logstash-devlog 的详细信息(消费组名称是在logstash的配置文件中定义的)
  7. ./kafka-consumer-groups.sh --bootstrap-server zk01:9092 --group logstash-devlog --describe
  8. # 定位到PARTITION列的值为 33 这行
  9. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  10. logstash-devlog deviceRequestLog 33 124692 127367 2675 - - -
  11. # ...... 省略部分输出
  12. # 通过查看消费组的详细信息不难发现,就是从offset 为 124692 开始损坏的,那么现在就要找到,它到哪里开始,又是正常的呢?
  13. # 只能通过如下指令,用命令指定某个topic的某个分区的某个offset开始消费
  14. $ ./kafka-console-consumer.sh --bootstrap-server 10.253.155.200:9092 --topic deviceRequestLog --partition 33 --offset 125000
  15. # 如果指定的--offset 125000 还是损坏的,则会输出如下错误信息:
  16. [2021-01-06 23:24:38,869] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
  17. java.lang.IllegalStateException: Unexpected error code 2 while fetching from partition deviceRequestLog-33
  18. at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1236)
  19. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:591)
  20. at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
  21. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
  22. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
  23. at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
  24. at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
  25. at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
  26. at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
  27. at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
  28. Processed a total of 0 messages
  29. # 然后一步步缩小范围,直到可以正常消费
  30. # 我这里最后定位到从 --offset 125223 开始,才可以正常消费
  31. # 也就是说,从 offset 124692 到 125223,这中间的数据是损坏的
  32. $ ./kafka-console-consumer.sh --bootstrap-server zk01:9092 --topic deviceRequestLog --partition 33 --offset 125223

那么,现在定位到损坏数据有哪些了,那现在重要的一个事情就是,如何修改记录的消费组offset,以便消费组可以从指定位置开始消费呢?(不可以定义消费组从开头进行消费,这样会造成消费重复数据,并且再次消费到124692这个offset时,还会报错。也不可以定义消费组从最新开始消费,这样丢失的数据太多,所以只能让消费组从—offset 125223开始消费)。具体命令如下:

  1. $ ./kafka-consumer-groups.sh --bootstrap-server zk01:9092 --group logstash-devlog --reset-offsets --topic deviceRequestLog:33 --to-offset 125223 --execute
  2. # --group:指定消费组
  3. # --reset-offsets:重置消费偏移量
  4. # --topic:指定 topic:partition(topic:分区)
  5. # --to-offset:指定移至哪段offset

当进行上述指令后,将logstash配置的消费deviceRequestLog这个topic的配置文件移动到当前目录下,然后启动logstash,耶,惊喜万分,启动成功。接下来如法炮制,将另一个topic也这样来一遍,齐活,问题解决。

故障复盘

经此次线上故障,总结如下几点:

  • 不要将程序部署在共享目录上;
  • kafka的topic副本数至少要设置2个;
  • 做好监控,遇到问题不要着急,理清思路再下手;

写这篇文章我只用了四十分钟,但是解决这个问题,我用了6小时,其中的心酸,只有自己知道,不过也正是这一次踩坑,让我对kafka有了更深层次的认识。