Kafka生产者发送消息
https://www.processon.com/view/6215a7d763768906ec2239a6
kafka消费者消费消息
coordinate
https://www.processon.com/view/62184f147d9c081053d31a75
消费流程
拦截器
从流程图可以看到每个消息都会经过拦截器,那么我们就可以借助拦截器实现诸如消息计数,消息消费时间的统计。
public interface ProducerInterceptor<K, V> extends Configurable {
// 消息发送之前调用
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 消息发送成功or失败之后被调用
void onAcknowledgement(RecordMetadata metadata, Exception exception);
// 拦截器关闭时调用
void close();
只需要实现onSend
方法,我们可以实现对每个消息添加发送时间。ps:相应的消费者也有它的拦截器,实现消费者的拦截器我们就可以计算出每条消息的从生产成功到消费生命周期。
消息消费
消息不丢失
要保证kafka的消息不丢失,需要修改生产者的配置
acks=-1
、min.insync.replicas >1
消息不丢失需要生产者和消费者共同保证
生产者不丢失
ISR
:同步中的副本,分区leader会根据参数replica.lag.time.max.ms
判断副本是否掉线或卡了,当超过这个参数时,将该副本移出ISR
.
生产者需要将设置acks=-1
,min.insync.replicas>1
。
ack = -1
:默认是1,只需要leader返回ack即认为发送成功。ack为-1时需要所有isr
返回ack才可以认为发送成功。
min.insync.replicas>1
:该参数规定了最小的ISR数,默认是1(也就是leader)。>1时表示需要多个ISR副本。
ISR不是全部副本~!
消费者不丢失
要保证消费者消息不丢失则需要将消费者参数
enable.auto.commit
设置为false,开启手动提交,在每次消费完之后再手动提交位移信息。但是这样会导致消息重复消费。Ps:一个分区只能被一个消费者组内的一个消费者消费,反之,一个消费者可以同时消费多个分区。
精准一次消费
重复消费场景
重复消费一般分为两大类,常见的是开启了自动提交
enable.auto.commit = true
。开启自动提交后,消费者消费消息后需要经过auto.commit.interval.ms
后的下次poll才会提交offsets。
消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟, Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。rebalance之后,消息将被再次下发。
消费者被kill
消费者poll数据后消费,还没达到
auto.commit.interval.ms
时间提交offsets时, 被kill掉。该部分消息将被再次下发。
解决方案
针对消费时间过长的问题:我们可以通过调大max.poll.interval.ms
的值减少不必要的rebalance,同时适当减小 max.poll.records
的值,使每次拉取的量变小。
重复消费的本质是offset没有被成功的提交。既然自动提交会出现上述的情况,那么可以将enable.auto.commit
置为false,改为手动提交。手动提交能避免上述情况,但是还是没法拒绝重复消费的问题。
那么我们就需要在消费的时候对消息进行去重,倘若消息消费的结果是幂等的(如redis set 或es put)这种天然幂等操作,我们可以不要做额外的工作。若是插入数据库记录或soa调用的话,我们就需要借助redis 或 mysql唯一主键对消息进行去重。