Kafka的Api用于通过编程语言使用Kafka。Kafka官方提供的客户端Api分为5大类

  • 生产者Api:主要提供发送消息到Kafka的Api支持
  • 消费者Api: 主要提供消费消息的Api
  • 管理Api: 主要负责Topic的创建相关管理工作的Api
  • Connect Api: Kafka Connect功能相关的Api
  • Stream Api: Kafka Stream功能相关的Api

    使用方式

    在官方文档的介绍中,以上的Api都可以通过依赖Kafka相应版本的Client包引入,其中KafkaConnect可以使用预构建好的Connect(但是也可以自己写)。
    1. <dependency>
    2. <groupId>org.apache.kafka</groupId>
    3. <artifactId>kafka-streams</artifactId>
    4. <version>0.11.0.2</version>
    5. </dependency>
    额,官方文档就讲到了这里。下面我试着简单的用一下Api.

KafkaProducer文档阅读

KafkaProducer是线程安全的,多个线程共享同一个Producer实例要比多个Producer实例的速度快的多。
下面的代码是一个发送消息的简单代码实例

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("acks", "all");
  4. props.put("retries", 0);
  5. props.put("batch.size", 16384);
  6. props.put("linger.ms", 1);
  7. props.put("buffer.memory", 33554432);
  8. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10. Producer<String, String> producer = new KafkaProducer<>(props);
  11. for (int i = 0; i < 100; i++)
  12. producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
  13. producer.close();

KafkaProducer会维护一个缓冲池,用来缓冲消息那些还没有被发送到Kafka集群中的消息,另外,KafkaProducer后台会启动一个I/O线程,用于将消息发送到指定的Kafka集群,因此一定要关闭KakfaProducer,否则会泄露资源。
KafkaProducer的send()方法是异步的,当调用这个方法后,消息会被发送到缓冲池中就会立刻进行方法的返回。

配置项 实例属性 作用
acks all acks配置一个消息发送完成的标准。
all:会在全部完成提交之前阻塞,直到全部提交成功才结束。这是最慢但是最可靠的方式
reties 整数 重试次数,设置0为不重复发送,这个配置可能会有重复消息被发送
batch.size 整数 指定缓冲区的大小,Producer为每个分区都分配这么一个缓冲区,通过这个属性设置缓冲区的大小。越大越占内存
linger.ms 整数 设置在缓冲池不满的情况下,发送消息的等待时间。
buffer.memory 整数 设置缓冲区的大小
max.block.ms 整数 设置发送阻塞的最大等待时间
key.serializer 类路径 key值的序列化方式
value.serializer 类路径 value的序列化方式

自Kafka0.11后,Kafka支持了另外两种消息模式:幂等生产者和事务生产者。

  • 幂等生产者:消息语义从至少一次转换为精确一次
  • 事务生产者:允许多个分区的消息发送保持原子性

如何使用幂等生产者?

配置项 实例属性 作用
enable.idempotence true 开启幂等模式
reties Integer.MAX_VALUE 保持默认值
max.in.flight.requests.per.connection 1 保持默认值
acks all 设置为all

如何使用事务生产者?

配置项 实例属性 作用
transactional.id 事务id(String) 标识哪个事务
reties Integer.MAX_VALUE 保持默认值
max.in.flight.requests.per.connection 1 保持默认值
acks all 设置为all

事务生产者使用实例:

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("transactional.id", "my-transactional-id");
  4. Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
  5. producer.initTransactions();
  6. try {
  7. producer.beginTransaction();
  8. for (int i = 0; i < 100; i++)
  9. producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
  10. producer.commitTransaction();
  11. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  12. // We can't recover from these exceptions, so our only option is to close the producer and exit.
  13. producer.close();
  14. } catch (KafkaException e) {
  15. // For all other exceptions, just abort the transaction and try again.
  16. producer.abortTransaction();
  17. }
  18. producer.close();

KafkaConsumer文档阅读

Offset和Position

  • Offset:是Kafka为每条记录设置的一个编号
  • Position: 是消费者消费的位置记录

对于Postion来说有两种类型:

  • position:每次消费者调用poll() 方法,会自动增加,position位置为消费者即将消费的位置
  • committed position: 是安全存储的最后一个偏移量。如果进程失败并重新启动,这就是使用者将恢复到的偏移量。消费者可以周期性地自动提交偏移量;或者它可以选择通过调用其中一个提交api(例如commitSync和commitAsync)来手动控制这个提交位置。

    消费者组和消息订阅

    消费者组的定义只需要启动多个KafkaConsumer并设置同一个group.id即可。

    自动提交Offset消费者定义

    ```java Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test”); props.put(“enable.auto.commit”, “true”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(“foo”, “bar”)); while (true) {
    1. ConsumerRecords<String, String> records = consumer.poll(100);
    2. for (ConsumerRecord<String, String> record : records)
    3. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
  1. | bootstrap.servers | xx:9092 | 指定broker地址 |
  2. | --- | --- | --- |
  3. | enable.auto.commit | true | 是否开启自动提交Offset |
  4. | auto.commit.interval.ms | 100 | 每多少毫秒提交offset |
  5. | group.id | xxx | 消费者组 |
  6. <a name="GcDi9"></a>
  7. ### 手动提交Offset
  8. ```java
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", "localhost:9092");
  11. props.put("group.id", "test");
  12. props.put("enable.auto.commit", "false");
  13. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  15. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  16. consumer.subscribe(Arrays.asList("foo", "bar"));
  17. final int minBatchSize = 200;
  18. List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
  19. while (true) {
  20. ConsumerRecords<String, String> records = consumer.poll(100);
  21. for (ConsumerRecord<String, String> record : records) {
  22. buffer.add(record);
  23. }
  24. if (buffer.size() >= minBatchSize) {
  25. insertIntoDb(buffer);
  26. consumer.commitSync();
  27. buffer.clear();
  28. }
  29. }

手动分配分区

  1. String topic = "foo";
  2. TopicPartition partition0 = new TopicPartition(topic, 0);
  3. TopicPartition partition1 = new TopicPartition(topic, 1);
  4. consumer.assign(Arrays.asList(partition0, partition1));

存储消费者Offset在外部