生产者写数据

  • 相关参数:
    • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
    • linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
  • 相关类:

    • KafkaProducer:需要创建一个生产者对象,用来发送数
    • ProducerConfig:获取所需的一系列配置参数
    • ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

      异步发送

      Kafka的Producer发送消息采用的是异步发送的方式在消息发送的过程中,涉及到了两个线程:
      main线程和Sender线程,以及一个线程共享变量 RecordAccumulator。
      main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到broker。
      有两种API:
      1、不带回调函数的API
      2、带回调函数的API:回调函数会在producer收到ack时异步调用,如果Exception不为null 表示消息发送失败。失败会自动重试,不需要我们处理

      同步发送

      同步发送的意思是:一条消息发送之后会阻塞当前线程 直到返回ack
      使用方法:producer.send().get(); (在异步的基础上 增加get即可)

      消费者读数据

  • 相关参数:

    • enable.auto.commit:是否开启自动提交 offset 功能
    • auto.commit.interval.ms:自动提交 offset 的时间间隔
  • 相关类:
    • KafkaConsumer:需要创建一个消费者对象,用来消费数据
    • ConsumerConfig:获取所需的一系列配置参数
    • ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

      自动提交offset(一般不用)

      手动提交offset

      手动提交offset分为两类
      1、同步提交:当前线程会阻塞直到offset提交成功
      2、异步提交:虽然同步提交更可靠,但是会影响吞吐量。因为更多的情况下选用异步提交