MQ 中的消息过期失效了怎么办?

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 Queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。这时的问题就不是数据会大量积压在 MQ 里,而是大量的数据会直接搞丢。这个情况下,就不是说要增加 Consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。
我们可以采取一个方案,就是批量重导。就是大量积压的时候,直接丢弃数据了,然后等过了高峰期以后开始写程序,将丢失的那批数据一点一点的查出来,然后重新灌入 MQ 里面去,把丢的数据给补回来。

大量消息在 mq 里积压了几个小时了还没解决

临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

    如何保证消息消费的幂等性?

    要保证消息不被重复消费,其实就是要保证消息消费时的幂等性。幂等性:无论你重复请求多少次,得到的结果都是一样的。例如:一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
    那么如何保证幂等性呢?
  1. 写数据时,先根据主键查一下这条数据是否存在,如果已经存在则 update;
  2. 数据库的唯一键约束也可以保证不会重复插入多条,因为重复插入多条只会报错,不会导致数据库中出现脏数据;
  3. 如果是写 redis,就没有问题,因为 set 操作是天然幂等性的。

    MQ如何保证可靠性?

    消费端弄丢了数据
    唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
    这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
    生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。
    Kafka 弄丢了数据
    这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?
    生产环境也遇到过,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
    所以此时一般是要求起码设置如下 4 个参数:
  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

Kafka 有几种数据保留的策略?

Kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留。

Kafka是什么

Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用Scala 语言编写,目前是 Apache 的开源项目。broker:Kafka 服务器,负责消息存储和转发topic:消息类别, Kafka 按照 topic 来分类消息partition:topic 的分区,一个 topic 可以包含多个 partition, topic 消息保存在各个partition 上offset:消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表该消息的唯一序号Producer:消息生产者Consumer:消息消费者Consumer Group:消费者分组,每个 Consumer 必须属于一个 groupZookeeper:保存着集群 broker、 topic、 partition 等 meta 数据;另外,还负责 broker 故障发现, partition leader 选举,负载均衡等功能

Kafka为什么吞吐量高

Kafka的生产者采用的是异步发送消息机制,当发送一条消息时,消息并没有发送到Broker而是缓存起来,然后直接向业务返回成功,当缓存的消息达到一定数量时再批量发送给Broker。这种做法减少了网络io,从而提高了消息发送的吞吐量,但是如果消息生产者宕机,会导致消息丢失,业务出错,所以理论上kafka利用此机制提高了性能却降低了可靠性。

如何保证消息的高效读写?

零拷贝: kafka和RocketMQ都是通过零拷贝技术来优化文件读写。
传统文件复制方式: 需要对文件在内存中进行四次拷贝。
零拷贝: 有两种方式, mmap和transfile,Java当中对零拷贝进行了封装, Mmap方式通过MappedByteBuffer对象进行操作,而transfile通过FileChannel来进行操作。Mmap 适合比较小的文件,通常文件大小不要超过1.5G ~2G 之间。Transfile没有文件大小限制。RocketMQ当中使用Mmap方式来对他的文件进行读写。
在kafka当中,他的index日志文件也是通过mmap的方式来读写的。在其他日志文件当中,并没有使用零拷贝的方式。Kafka使用transfile方式将硬盘数据加载到网卡。

Kafka的Pull和Push分别有什么优缺点

  1. pull表示消费者主动拉取,可以批量拉取,也可以单条拉取,所以pull可以由消费者自己控制,根据自己的消息处理能力来进行控制,但是消费者不能及时知道是否有消息,可能会拉到的消息为空
  2. push表示Broker主动给消费者推送消息,所以肯定是有消息时才会推送,但是消费者不能按自己的能力来消费消息,推过来多少消息,消费者就得消费多少消息,所以可能会造成网络堵塞,消费者压力大等问题


    为什么要使用 kafka,为什么要使用消息队列?

    缓冲和削峰 :上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
    解耦和扩展性 :项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。冗余 :可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。健壮性 :消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。异步通信 :很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。**

Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

Kafka高效文件存储设计特点:

  1. Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  2. 通过索引信息可以快速定位 message 和确定 response 的最大大小。
  3. 通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
  4. 通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。**

Kafka与传统消息系统之间有三个关键区别

  1. Kafka 持久化日志,这些日志可以被重复读取和无限期保留
  2. Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
  3. Kafka 支持实时的流式处理**

    Kafka创建 Topic 时如何将分区放置到不同的 Broker 中

  4. 副本因子不能大于 Broker 的个数;

  5. 第一个分区(编号为 0)的第一个副本放置位置是随机从 brokerList 选择的;
  6. 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移。也就是如果我们有 5 个Broker, 5 个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个Broker 上,依次类推;
  7. 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的

Kafka的消费者如何消费数据

消费者每次消费数据的时候,消费者都会记录消费的物理偏移量( offset)的位置等到下次消费时,他会接着上次位置继续消费

Kafka的rebalance机制

consumer group中的消费者与topic下的partion重新匹配的过程
何时会产生rebalance:

  • consumer group中的成员个数发生变化
  • consumer消费超时
  • group订阅的topic个数发生变化
  • group订阅的topic的分区数发生变化

coordinator:通常是partition的leader节点所在的broker,负责监控group中consumer的存活,

  1. leader consumer监控topic的变化,通知coordinator触发rebalance
  2. coordinator通过心跳返回通知consumer进行rebalance
  3. consumer请求coordinator加入组,coordinator选举产生leader consumer
  4. leader consumer从coordinator获取所有的consumer,发送syncGroup(分配信息)给到 coordinator
  5. coordinator通过心跳机制将syncGroup下发给consumer
  6. 完成rebalance

consumer维持到coordinator的心跳,判断consumer的消费超时
rebalance策略

  • range 按照分区序号排序
  • round-robin轮询
  • sticky策略 初始时分配策略与round-robin类似,reblance时 1分区分配尽可能均匀 2分区的分配尽可能与上次分配保持相同

如果消费消息超时,触发rebalance,重新分配后、该消息会被其他消费者消费,此时消费完成提
交offset、导致错误
解决:coordinator每次rebalance,会标记一个Generation给到consumer,每次rebalance该
Generation会+1,consumer提交offset时,coordinator会比对Generation,不一致则拒绝提交

Kafka消费者负载均衡策略

一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如果组中成员太多会有空闲的成员

kafaka生产数据时数据的分组策略

生产者决定数据产生到集群的哪个 partition 中每一条消息都是以( key, value)格式 Key是由生产者发送数据传入所以生产者( key)决定了数据产生到集群的哪个 partition

Kafka中是怎么体现消息顺序性的?

kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.

Kafka如何实现延迟队列?

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。