Kafka的Api用于通过编程语言使用Kafka。Kafka官方提供的客户端Api分为5大类
- 生产者Api:主要提供发送消息到Kafka的Api支持
- 消费者Api: 主要提供消费消息的Api
- 管理Api: 主要负责Topic的创建相关管理工作的Api
- Connect Api: Kafka Connect功能相关的Api
- Stream Api: Kafka Stream功能相关的Api
使用方式
在官方文档的介绍中,以上的Api都可以通过依赖Kafka相应版本的Client包引入,其中KafkaConnect可以使用预构建好的Connect(但是也可以自己写)。
额,官方文档就讲到了这里。下面我试着简单的用一下Api.<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.2</version>
</dependency>
KafkaProducer文档阅读
KafkaProducer是线程安全的,多个线程共享同一个Producer实例要比多个Producer实例的速度快的多。
下面的代码是一个发送消息的简单代码实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
KafkaProducer会维护一个缓冲池,用来缓冲消息那些还没有被发送到Kafka集群中的消息,另外,KafkaProducer后台会启动一个I/O线程,用于将消息发送到指定的Kafka集群,因此一定要关闭KakfaProducer,否则会泄露资源。
KafkaProducer的send()
方法是异步的,当调用这个方法后,消息会被发送到缓冲池中就会立刻进行方法的返回。
配置项 | 实例属性 | 作用 |
---|---|---|
acks | all | acks配置一个消息发送完成的标准。 all:会在全部完成提交之前阻塞,直到全部提交成功才结束。这是最慢但是最可靠的方式 |
reties | 整数 | 重试次数,设置0为不重复发送,这个配置可能会有重复消息被发送 |
batch.size | 整数 | 指定缓冲区的大小,Producer为每个分区都分配这么一个缓冲区,通过这个属性设置缓冲区的大小。越大越占内存 |
linger.ms | 整数 | 设置在缓冲池不满的情况下,发送消息的等待时间。 |
buffer.memory | 整数 | 设置缓冲区的大小 |
max.block.ms | 整数 | 设置发送阻塞的最大等待时间 |
key.serializer | 类路径 | key值的序列化方式 |
value.serializer | 类路径 | value的序列化方式 |
自Kafka0.11后,Kafka支持了另外两种消息模式:幂等生产者和事务生产者。
- 幂等生产者:消息语义从
至少一次
转换为精确一次
- 事务生产者:允许多个分区的消息发送保持原子性
如何使用幂等生产者?
配置项 | 实例属性 | 作用 |
---|---|---|
enable.idempotence | true | 开启幂等模式 |
reties | Integer.MAX_VALUE | 保持默认值 |
max.in.flight.requests.per.connection | 1 | 保持默认值 |
acks | all | 设置为all |
如何使用事务生产者?
配置项 | 实例属性 | 作用 |
---|---|---|
transactional.id | 事务id(String) | 标识哪个事务 |
reties | Integer.MAX_VALUE | 保持默认值 |
max.in.flight.requests.per.connection | 1 | 保持默认值 |
acks | all | 设置为all |
事务生产者使用实例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
KafkaConsumer文档阅读
Offset和Position
- Offset:是Kafka为每条记录设置的一个编号
- Position: 是消费者消费的位置记录
对于Postion来说有两种类型:
- position:每次消费者调用
poll()
方法,会自动增加,position位置为消费者即将消费的位置 - committed position: 是安全存储的最后一个偏移量。如果进程失败并重新启动,这就是使用者将恢复到的偏移量。消费者可以周期性地自动提交偏移量;或者它可以选择通过调用其中一个提交api(例如commitSync和commitAsync)来手动控制这个提交位置。
消费者组和消息订阅
消费者组的定义只需要启动多个KafkaConsumer并设置同一个group.id
即可。自动提交Offset消费者定义
```java Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test”); props.put(“enable.auto.commit”, “true”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(“foo”, “bar”)); while (true) {
}ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
| bootstrap.servers | xx:9092 | 指定broker地址 |
| --- | --- | --- |
| enable.auto.commit | true | 是否开启自动提交Offset |
| auto.commit.interval.ms | 100 | 每多少毫秒提交offset |
| group.id | xxx | 消费者组 |
<a name="GcDi9"></a>
### 手动提交Offset
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
手动分配分区
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));