- Offical Site
- 介绍
- 开始使用
- Core
- Web
- Web Client
- Data Access
- Integration
- Event Bus Bridge
- Authentication and Authorisation
- Reactive
- Microservices
- IoT
- Devops
- Testing
- Clustering
- Services
- Cloud
- 翻译计划
- Published with GitBook
Kafka Client
Vert.x Kafka Client
中英文对照表
组件介绍
此组件提供了 Kafka Client 的集成,可以以 Vert.x 的方式从 Apache Kafka 集群上消费或者发送消息。
对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition),或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。
对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。
警告:此组件处于技术预览阶段,因此在之后版本中API可能还会发生一些变更。
使用 Vert.x Kafka Client
要使用 Vert.x Kafka Client 组件,需要添加以下依赖:
- Maven(在
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>3.4.1</version>
</dependency>
- Gradle(在
build.gradle
文件中):
compile 'io.vertx:vertx-kafka-client:3.4.1'
创建 Kafka Client
创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。
我们需要对 Consumer 与 Producer 进行一些相关的配置,具体可以参考 Apache Kafka 的官方文档:
我们可以通过一个 Map 来包装这些配置,然后将其传入到 KafkaConsumer
接口或 KafkaProducer
接口中的 create
静态方法里来创建 KafkaConsumer
或 KafkaProducer
:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
// 创建一个Kafka Consumer
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
在上面的例子中,我们在创建 KafkaConsumer
实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。
我们可以用类似的方法来创建 Producer:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");
// 创建一个Kafka Producer
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
另外也可以使用 Properties
来代替 Map:
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
消息的 key 和 value 的序列化格式也可以作为 create
方法的参数直接传进去,而不是在相关配置中指定:
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.ACKS_CONFIG, "1");
// 注意这里的第三和第四个参数
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);
在这里,我们在创建 KafkaProducer
实例的时候传入了一个 Properties
实例,用于指定要连接的 Kafka 节点列表(只有一个)和消息确认模式。消息 key 和 value 的解析方式作为参数传入 KafkaProducer.create
方法中。
消费感兴趣 Topic 的消息并加入消费组
我们可以通过 KafkaConsumer
的的 subscribe 方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。当然你需要通过 handler
方法注册一个 Handler
来处理接收的消息:
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
// 订阅多个topic
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);
// 订阅单个主题
consumer.subscribe("a-single-topic");
另外如果想知道消息是否成功被消费掉,可以在调用 subscribe
方法时绑定一个 Handler
:
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
// subscribe to several topics
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
//这里lambda表达式用于接收消息处理结果
consumer.subscribe(topics, ar -> {
if (ar.succeeded()) {
System.out.println("subscribed");
} else {
System.out.println("Could not subscribe " + ar.cause().getMessage());
}
});
//这里lambda表达式用于接收消息处理结果
consumer.subscribe("a-single-topic", ar -> {
if (ar.succeeded()) {
System.out.println("subscribed");
} else {
System.out.println("Could not subscribe " + ar.cause().getMessage());
}
});
由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition,同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。
如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。您可以通过 partitionsRevokedHandler
和 partitionsAssignedHandler
方法在 KafkaConsumer
里注册一个 Handler
用于监听对应的 partition 是否被删除或者分配。
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
// 注册一个用于侦听新分配partition的Handler
consumer.partitionsAssignedHandler(topicPartitions -> {
System.out.println("Partitions assigned");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
// 注册一个用于侦听撤销partition的Handler
consumer.partitionsRevokedHandler(topicPartitions -> {
System.out.println("Partitions revoked");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
// subscribes to the topic
consumer.subscribe("test", ar -> {
if (ar.succeeded()) {
System.out.println("Consumer subscribed");
}
});
加入某个 consumer group 的消费者,可以通过 unsubscribe
方法退出该消费组,从而不再接受到相关消息:
consumer.unsubscribe();
当然你也可以在 unsubscribe
方法中传入一个 Handler
用于监听执行结果状态:
consumer.unsubscribe(ar -> {
if (ar.succeeded()) {
System.out.println("Consumer unsubscribed");
}
});
从 Topic 的特定分区里接收消息
消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那么整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。
您可以通过 assign
方法请求分配指定的分区:
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
//
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
.setTopic("test")
.setPartition(0));
// 要求分配到特定的topic以及partitions
consumer.assign(topicPartitions, done -> {
if (done.succeeded()) {
System.out.println("Partition assigned");
// 侦听分配结果
consumer.assignment(done1 -> {
if (done1.succeeded()) {
for (TopicPartition topicPartition : done1.result()) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
}
});
}
});
上面的 assignment
方法可以列出当前分配的 topic partition。
获取 Topic 以及分区信息
您可以通过 partitionsFor
方法获取指定 topic 的 partition 信息:
consumer.partitionsFor("test", ar -> {
if (ar.succeeded()) {
for (PartitionInfo partitionInfo : ar.result()) {
System.out.println(partitionInfo);
}
}
});
另外,listTopics
方法可以列出消费者下的所有 topic 以及对应的 partition 信息:
consumer.listTopics(ar -> {
if (ar.succeeded()) {
Map<String, List<PartitionInfo>> map = ar.result();
map.forEach((topic, partitions) -> {
System.out.println("topic = " + topic);
System.out.println("partitions = " + map.get(topic));
});
}
});
手动提交偏移量
在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer
时将 enable.auto.commit
配置项设为 true
来开启自动提交。
我们可以通过 commit
方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。
consumer.commit(ar -> {
if (ar.succeeded()) {
System.out.println("Last read message offset committed");
}
});
分区偏移量定位
Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作,并从任意指定的 topic 以及 partition 位置开始消费消息。我们可以通过 seek
方法来更改读取位置对应的偏移量:
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
// 指定offset位置10
consumer.seek(topicPartition, 10, done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
当消费者需要从 Stream 的起始位置读取消息时,可以使用 seekToBeginning 方法将 offset
位置设置到 partition 的起始端:
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
// 将offset挪到分区起始端
consumer.seekToBeginning(Collections.singleton(topicPartition), done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
最后我们也可以通过 seekToEnd
方法将 offset
位置设置到 partition 的末端:
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
// 将offset挪到分区末端
consumer.seekToEnd(Collections.singleton(topicPartition), done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
偏移量查询
你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets
来获取给定分区的起始偏移量。这个跟上面的 seekToBeginning
方法有一个地方不同:beginningOffsets
方法不会更改 offset 的值,仅仅是读取(只读模式)。
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer.beginningOffsets(topicPartitions, done -> {
if(done.succeeded()) {
Map<TopicPartition, Long> results = done.result();
results.forEach((topic, beginningOffset) ->
System.out.println("Beginning offset for topic="+topic.getTopic()+", partition="+
topic.getPartition()+", beginningOffset="+beginningOffset));
}
});
// partition offset 查询辅助方法
consumer.beginningOffsets(topicPartition, done -> {
if(done.succeeded()) {
Long beginningOffset = done.result();
System.out.println("Beginning offset for topic="+topicPartition.getTopic()+", partition="+
topicPartition.getPartition()+", beginningOffset="+beginningOffset);
}
});
与此对应的API还有 endOffsets
方法,用于获取给定分区末端的偏移量值。与 seekToEnd
方法相比,endOffsets
方法不会更改 offset 的值,仅仅是读取(只读模式)。
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer.endOffsets(topicPartitions, done -> {
if(done.succeeded()) {
Map<TopicPartition, Long> results = done.result();
results.forEach((topic, endOffset) ->
System.out.println("End offset for topic="+topic.getTopic()+", partition="+
topic.getPartition()+", endOffset="+endOffset));
}
});
consumer.endOffsets(topicPartition, done -> {
if(done.succeeded()) {
Long endOffset = done.result();
System.out.println("End offset for topic="+topicPartition.getTopic()+", partition="+
topicPartition.getPartition()+", endOffset="+endOffset);
}
});
Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes
,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。
Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
// 我们只想要60秒之前的消息的offset
long timestamp = (System.currentTimeMillis() - 60000);
topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer.offsetsForTimes(topicPartitionsWithTimestamps, done -> {
if(done.succeeded()) {
Map<TopicPartition, OffsetAndTimestamp> results = done.result();
results.forEach((topic, offset) ->
System.out.println("Offset for topic="+topic.getTopic()+
", partition="+topic.getPartition()+"\n"+
", timestamp="+timestamp+", offset="+offset.getOffset()+
", offsetTimestamp="+offset.getTimestamp()));
}
});
consumer.offsetsForTimes(topicPartition, timestamp, done -> {
if(done.succeeded()) {
OffsetAndTimestamp offsetAndTimestamp = done.result();
System.out.println("Offset for topic="+topicPartition.getTopic()+
", partition="+topicPartition.getPartition()+"\n"+
", timestamp="+timestamp+", offset="+offsetAndTimestamp.getOffset()+
", offsetTimestamp="+offsetAndTimestamp.getTimestamp());
}
});
流量控制
Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause
)消息的流入(这里实际上是把消息全部缓存到内存里了);等我们处理了差不多了,可以再继续消费缓存起来的消息(resume
)。
我们可以利用 pause
方法和 resume
方法来进行流量控制:
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
//注册一个handler处理进来的消息
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
// 如果我们读到partition0的第5个offset
if ((record.partition() == 0) && (record.offset() == 5)) {
// 则暂停读取
consumer.pause(topicPartition, ar -> {
if (ar.succeeded()) {
System.out.println("Paused");
// 5秒后再恢复,继续读取
vertx.setTimer(5000, timeId -> {
// resumi read operations
consumer.resume(topicPartition);
});
}
});
}
});
关闭 Consumer
关闭 Consumer 只需要调用 close
方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。
由于 close
方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler
)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler
将会被调用。
consumer.close(res -> {
if (res.succeeded()) {
System.out.println("Consumer is now closed");
} else {
System.out.println("close failed");
}
});
发送消息到某个 Topic
您可以利用 write
方法来向某个 topic 发送消息(records)。
最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下,消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。
for (int i = 0; i < 5; i++) {
// 这里指定了topic和 message value,以round robin方式发送的目标partition
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.write(record);
}
您可以通过绑定 Handler
来接受发送的结果。这个结果其实就是一些元数据(metadata),包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。
for (int i = 0; i < 5; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.write(record, done -> {
if (done.succeeded()) {
RecordMetadata recordMetadata = done.result();
System.out.println("Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
", partition=" + recordMetadata.getPartition() +
", offset=" + recordMetadata.getOffset());
}
});
}
如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:
for (int i = 0; i < 10; i++) {
// 这里指定了 partition 为 0
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", null, "message_" + i, 0);
producer.write(record);
}
因为 Producer 可以使用消息的 key 作为 hash 值来确定 partition,所以我们可以保证所有的消息被发送到同样的 partition 中,并且是有序的。
for (int i = 0; i < 10; i++) {
// i.e. defining different keys for odd and even messages
int key = i % 2;
//这里指明了key,所有的消息将被发送同一个partition.
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);
producer.write(record);
}
注意:可共享的 Producer 通过
createShared
方法创建。它可以在多个 Verticle 实例之间共享,所以相关的配置必须在创建 Producer 的时候定义。
共享 Producer
有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。您可以通过 KafkaProducer.createShared
方法来创建可以在 Verticle 之间安全共享的 KafkaProducer
实例:
KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);
// 关闭
producer1.close();
返回的 KafkaProducer
实例将复用相关的资源(如线程、连接等)。使用完 KafkaProducer
后,直接调用 close
方法关闭即可,相关的资源会自动释放。
关闭 Producer
与关闭 Consumer 类似,关闭 Producer 只需要调用 close
方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。
由于 close
方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler
)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler
将会被调用。
producer.close(res -> {
if (res.succeeded()) {
System.out.println("Producer is now closed");
} else {
System.out.println("close failed");
}
});
获取 Topic Partition 的相关信息
您可以通过 partitionsFor
方法获取指定 topic 的分区信息。
producer.partitionsFor("test", ar -> {
if (ar.succeeded()) {
for (PartitionInfo partitionInfo : ar.result()) {
System.out.println(partitionInfo);
}
}
});
错误处理
您可以利用 KafkaProducer#exceptionHandler
方法和 KafkaConsumer#exceptionHandler
方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。比如:
consumer.exceptionHandler(e -> {
System.out.println("Error = " + e.getMessage());
});
随 Verticle 自动关闭
如果您是在 Verticle 内部创建的 Consumer 和 Producer,那么当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。
使用 Vert.x 自带的序列化与反序列化机制
Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 Buffer
、JsonObject
和 JsonArray
等类型。
在 KafkaConsumer
里您可以使用 Buffer
:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
// 创建一个可以反序列化成jsonObject的consumer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
// 创建一个可以反序列化成jsonArray的consumer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
同样在 KafkaProducer
中也可以:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");
// 创建一个可以序列化成jsonObject的Producer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");
// 创建一个可以序列化成jsonArray的Producer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");
您也可以在 create
方法里指明序列化与反序列化相关的类。
比如创建 Consumer 时:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
// 创建一个可以反序列化成Buffer的Consumer
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);
// 创建一个可以反序列化成JsonObject的Consumer
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);
// 创建一个可以反序列化成JsonArray的Consumer
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);
创建 Producer 时:
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");
// 创建一个可以序列化成Buffer的Producer.
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);
// 创建一个可以序列化成jsonObject的Producer.
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);
// 创建一个可以序列化成jsonArray的Producer.
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);
RxJava API
Vert.x Kafka Client 组件也提供Rx风格的API。
译者注:此处也可以参考 Kafka Stream 相关的 API。
Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();
observable
.map(record -> record.value())
.buffer(256)
.map(
list -> list.stream().mapToDouble(n -> n).average()
).subscribe(val -> {
//获取到一个平均值
});
流实现与 Kafka 原生对象
如果您希望直接操作原生的 Kafka record,您可以使用原生的 Kafka 流式对象,它可以处理原生 Kafka 对象。
KafkaReadStream
用于读取 topic partition。它是 ConsumerRecord
对象的可读流对象,读到的是 ConsumerRecord
对象。
KafkaWriteStream
用于向某些 topic 中写入数据。它是 ProducerRecord
对象的可写流对象。
API通过这些接口将这些方法展现给用户,其他语言版本也应该类似。
原文档最后更新于 2017-03-15 15:54:14 CET