Kafka的Api用于通过编程语言使用Kafka。Kafka官方提供的客户端Api分为5大类
- 生产者Api:主要提供发送消息到Kafka的Api支持
 - 消费者Api: 主要提供消费消息的Api
 - 管理Api: 主要负责Topic的创建相关管理工作的Api
 - Connect Api: Kafka Connect功能相关的Api
 - Stream Api: Kafka Stream功能相关的Api
使用方式
在官方文档的介绍中,以上的Api都可以通过依赖Kafka相应版本的Client包引入,其中KafkaConnect可以使用预构建好的Connect(但是也可以自己写)。
额,官方文档就讲到了这里。下面我试着简单的用一下Api.<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>0.11.0.2</version></dependency>
 
KafkaProducer文档阅读
KafkaProducer是线程安全的,多个线程共享同一个Producer实例要比多个Producer实例的速度快的多。
下面的代码是一个发送消息的简单代码实例
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.close();
KafkaProducer会维护一个缓冲池,用来缓冲消息那些还没有被发送到Kafka集群中的消息,另外,KafkaProducer后台会启动一个I/O线程,用于将消息发送到指定的Kafka集群,因此一定要关闭KakfaProducer,否则会泄露资源。
KafkaProducer的send()方法是异步的,当调用这个方法后,消息会被发送到缓冲池中就会立刻进行方法的返回。
| 配置项 | 实例属性 | 作用 | 
|---|---|---|
| acks | all | acks配置一个消息发送完成的标准。 all:会在全部完成提交之前阻塞,直到全部提交成功才结束。这是最慢但是最可靠的方式  | 
| reties | 整数 | 重试次数,设置0为不重复发送,这个配置可能会有重复消息被发送 | 
| batch.size | 整数 | 指定缓冲区的大小,Producer为每个分区都分配这么一个缓冲区,通过这个属性设置缓冲区的大小。越大越占内存 | 
| linger.ms | 整数 | 设置在缓冲池不满的情况下,发送消息的等待时间。 | 
| buffer.memory | 整数 | 设置缓冲区的大小 | 
| max.block.ms | 整数 | 设置发送阻塞的最大等待时间 | 
| key.serializer | 类路径 | key值的序列化方式 | 
| value.serializer | 类路径 | value的序列化方式 | 
自Kafka0.11后,Kafka支持了另外两种消息模式:幂等生产者和事务生产者。
- 幂等生产者:消息语义从
至少一次转换为精确一次 - 事务生产者:允许多个分区的消息发送保持原子性
 
如何使用幂等生产者?
| 配置项 | 实例属性 | 作用 | 
|---|---|---|
| enable.idempotence | true | 开启幂等模式 | 
| reties | Integer.MAX_VALUE | 保持默认值 | 
| max.in.flight.requests.per.connection | 1 | 保持默认值 | 
| acks | all | 设置为all | 
如何使用事务生产者?
| 配置项 | 实例属性 | 作用 | 
|---|---|---|
| transactional.id | 事务id(String) | 标识哪个事务 | 
| reties | Integer.MAX_VALUE | 保持默认值 | 
| max.in.flight.requests.per.connection | 1 | 保持默认值 | 
| acks | all | 设置为all | 
事务生产者使用实例:
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();
KafkaConsumer文档阅读
Offset和Position
- Offset:是Kafka为每条记录设置的一个编号
 - Position: 是消费者消费的位置记录
 
对于Postion来说有两种类型:
- position:每次消费者调用
poll()方法,会自动增加,position位置为消费者即将消费的位置 - committed position:  是安全存储的最后一个偏移量。如果进程失败并重新启动,这就是使用者将恢复到的偏移量。消费者可以周期性地自动提交偏移量;或者它可以选择通过调用其中一个提交api(例如commitSync和commitAsync)来手动控制这个提交位置。
消费者组和消息订阅
消费者组的定义只需要启动多个KafkaConsumer并设置同一个group.id即可。自动提交Offset消费者定义
```java Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test”); props.put(“enable.auto.commit”, “true”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(“foo”, “bar”)); while (true) { 
}ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 
| bootstrap.servers | xx:9092 | 指定broker地址 || --- | --- | --- || enable.auto.commit | true | 是否开启自动提交Offset || auto.commit.interval.ms | 100 | 每多少毫秒提交offset || group.id | xxx | 消费者组 |<a name="GcDi9"></a>### 手动提交Offset```javaProperties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);consumer.commitSync();buffer.clear();}}
手动分配分区
String topic = "foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);consumer.assign(Arrays.asList(partition0, partition1));
