生产者写数据
- 相关参数:
- batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
- linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
相关类:
- KafkaProducer:需要创建一个生产者对象,用来发送数
- ProducerConfig:获取所需的一系列配置参数
- ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
异步发送
Kafka的Producer发送消息采用的是异步发送的方式在消息发送的过程中,涉及到了两个线程:
main线程和Sender线程,以及一个线程共享变量 RecordAccumulator。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到broker。
有两种API:
1、不带回调函数的API
2、带回调函数的API:回调函数会在producer收到ack时异步调用,如果Exception不为null 表示消息发送失败。失败会自动重试,不需要我们处理同步发送
同步发送的意思是:一条消息发送之后会阻塞当前线程 直到返回ack
使用方法:producer.send().get(); (在异步的基础上 增加get即可)消费者读数据
相关参数:
- enable.auto.commit:是否开启自动提交 offset 功能
- auto.commit.interval.ms:自动提交 offset 的时间间隔
- 相关类:
