[TOC]

Kafka Client

Vert.x Kafka Client

中英文对照表

组件介绍

此组件提供了 Kafka Client 的集成,可以以 Vert.x 的方式从 Apache Kafka 集群上消费或者发送消息。

对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition),或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。

对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。

警告:此组件处于技术预览阶段,因此在之后版本中API可能还会发生一些变更。

使用 Vert.x Kafka Client

要使用 Vert.x Kafka Client 组件,需要添加以下依赖:

  • Maven(在 pom.xml文件中):
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-kafka-client</artifactId>
  <version>3.4.1</version>
</dependency>
  • Gradle(在build.gradle文件中):
compile 'io.vertx:vertx-kafka-client:3.4.1'

创建 Kafka Client

创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。

我们需要对 Consumer 与 Producer 进行一些相关的配置,具体可以参考 Apache Kafka 的官方文档:

我们可以通过一个 Map 来包装这些配置,然后将其传入到 KafkaConsumer 接口或 KafkaProducer 接口中的 create 静态方法里来创建 KafkaConsumerKafkaProducer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个Kafka Consumer
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

在上面的例子中,我们在创建 KafkaConsumer 实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。

我们可以用类似的方法来创建 Producer:

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");

// 创建一个Kafka Producer
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

另外也可以使用 Properties 来代替 Map:

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

消息的 key 和 value 的序列化格式也可以作为 create 方法的参数直接传进去,而不是在相关配置中指定:

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.ACKS_CONFIG, "1");

// 注意这里的第三和第四个参数
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);

在这里,我们在创建 KafkaProducer 实例的时候传入了一个 Properties 实例,用于指定要连接的 Kafka 节点列表(只有一个)和消息确认模式。消息 key 和 value 的解析方式作为参数传入 KafkaProducer.create 方法中。

消费感兴趣 Topic 的消息并加入消费组

我们可以通过 KafkaConsumer 的的 subscribe 方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。当然你需要通过 handler 方法注册一个 Handler 来处理接收的消息:

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// 订阅多个topic
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);

// 订阅单个主题
consumer.subscribe("a-single-topic");

另外如果想知道消息是否成功被消费掉,可以在调用 subscribe 方法时绑定一个 Handler

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to several topics
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");

//这里lambda表达式用于接收消息处理结果
consumer.subscribe(topics, ar -> {
  if (ar.succeeded()) {
    System.out.println("subscribed");
  } else {
    System.out.println("Could not subscribe " + ar.cause().getMessage());
  }
});

//这里lambda表达式用于接收消息处理结果
consumer.subscribe("a-single-topic", ar -> {
  if (ar.succeeded()) {
    System.out.println("subscribed");
  } else {
    System.out.println("Could not subscribe " + ar.cause().getMessage());
  }
});

由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition,同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。

如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。您可以通过 partitionsRevokedHandlerpartitionsAssignedHandler 方法在 KafkaConsumer 里注册一个 Handler 用于监听对应的 partition 是否被删除或者分配。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// 注册一个用于侦听新分配partition的Handler
consumer.partitionsAssignedHandler(topicPartitions -> {

  System.out.println("Partitions assigned");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

// 注册一个用于侦听撤销partition的Handler
consumer.partitionsRevokedHandler(topicPartitions -> {

  System.out.println("Partitions revoked");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

// subscribes to the topic
consumer.subscribe("test", ar -> {

  if (ar.succeeded()) {
    System.out.println("Consumer subscribed");
  }
});

加入某个 consumer group 的消费者,可以通过 unsubscribe 方法退出该消费组,从而不再接受到相关消息:

consumer.unsubscribe();

当然你也可以在 unsubscribe 方法中传入一个 Handler 用于监听执行结果状态:

consumer.unsubscribe(ar -> {

  if (ar.succeeded()) {
    System.out.println("Consumer unsubscribed");
  }
});

从 Topic 的特定分区里接收消息

消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那么整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。

您可以通过 assign 方法请求分配指定的分区:

consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

//
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
  .setTopic("test")
  .setPartition(0));

// 要求分配到特定的topic以及partitions
consumer.assign(topicPartitions, done -> {

  if (done.succeeded()) {
    System.out.println("Partition assigned");

    // 侦听分配结果
    consumer.assignment(done1 -> {

      if (done1.succeeded()) {

        for (TopicPartition topicPartition : done1.result()) {
          System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
        }
      }
    });
  }
});

上面的 assignment 方法可以列出当前分配的 topic partition。

获取 Topic 以及分区信息

您可以通过 partitionsFor 方法获取指定 topic 的 partition 信息:


consumer.partitionsFor("test", ar -> {
  if (ar.succeeded()) {
    for (PartitionInfo partitionInfo : ar.result()) {
      System.out.println(partitionInfo);
    }
  }
});

另外,listTopics 方法可以列出消费者下的所有 topic 以及对应的 partition 信息:

consumer.listTopics(ar -> {

  if (ar.succeeded()) {

    Map<String, List<PartitionInfo>> map = ar.result();
    map.forEach((topic, partitions) -> {
      System.out.println("topic = " + topic);
      System.out.println("partitions = " + map.get(topic));
    });
  }
});

手动提交偏移量

在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer 时将 enable.auto.commit 配置项设为 true 来开启自动提交。

我们可以通过 commit 方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。

consumer.commit(ar -> {

  if (ar.succeeded()) {
    System.out.println("Last read message offset committed");
  }
});

分区偏移量定位

Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作,并从任意指定的 topic 以及 partition 位置开始消费消息。我们可以通过 seek 方法来更改读取位置对应的偏移量:

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 指定offset位置10
consumer.seek(topicPartition, 10, done -> {

  if (done.succeeded()) {
    System.out.println("Seeking done");
  }
});

当消费者需要从 Stream 的起始位置读取消息时,可以使用 seekToBeginning 方法将 offset 位置设置到 partition 的起始端:

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 将offset挪到分区起始端
consumer.seekToBeginning(Collections.singleton(topicPartition), done -> {

  if (done.succeeded()) {
    System.out.println("Seeking done");
  }
});

最后我们也可以通过 seekToEnd 方法将 offset 位置设置到 partition 的末端:


TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// 将offset挪到分区末端
consumer.seekToEnd(Collections.singleton(topicPartition), done -> {

  if (done.succeeded()) {
    System.out.println("Seeking done");
  }
});

偏移量查询

你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets 来获取给定分区的起始偏移量。这个跟上面的 seekToBeginning 方法有一个地方不同:beginningOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer.beginningOffsets(topicPartitions, done -> {
  if(done.succeeded()) {
    Map<TopicPartition, Long> results = done.result();
    results.forEach((topic, beginningOffset) ->
      System.out.println("Beginning offset for topic="+topic.getTopic()+", partition="+
        topic.getPartition()+", beginningOffset="+beginningOffset));
  }
});

// partition offset 查询辅助方法
consumer.beginningOffsets(topicPartition, done -> {
  if(done.succeeded()) {
    Long beginningOffset = done.result();
      System.out.println("Beginning offset for topic="+topicPartition.getTopic()+", partition="+
        topicPartition.getPartition()+", beginningOffset="+beginningOffset);
  }
});

与此对应的API还有 endOffsets 方法,用于获取给定分区末端的偏移量值。与 seekToEnd 方法相比,endOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer.endOffsets(topicPartitions, done -> {
  if(done.succeeded()) {
    Map<TopicPartition, Long> results = done.result();
    results.forEach((topic, endOffset) ->
      System.out.println("End offset for topic="+topic.getTopic()+", partition="+
        topic.getPartition()+", endOffset="+endOffset));
  }
});

consumer.endOffsets(topicPartition, done -> {
  if(done.succeeded()) {
    Long endOffset = done.result();
      System.out.println("End offset for topic="+topicPartition.getTopic()+", partition="+
        topicPartition.getPartition()+", endOffset="+endOffset);
  }
});

Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。

Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);

// 我们只想要60秒之前的消息的offset
long timestamp = (System.currentTimeMillis() - 60000);

topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer.offsetsForTimes(topicPartitionsWithTimestamps, done -> {
  if(done.succeeded()) {
    Map<TopicPartition, OffsetAndTimestamp> results = done.result();
    results.forEach((topic, offset) ->
      System.out.println("Offset for topic="+topic.getTopic()+
        ", partition="+topic.getPartition()+"\n"+
        ", timestamp="+timestamp+", offset="+offset.getOffset()+
        ", offsetTimestamp="+offset.getTimestamp()));

  }
});

consumer.offsetsForTimes(topicPartition, timestamp, done -> {
  if(done.succeeded()) {
    OffsetAndTimestamp offsetAndTimestamp = done.result();
      System.out.println("Offset for topic="+topicPartition.getTopic()+
        ", partition="+topicPartition.getPartition()+"\n"+
        ", timestamp="+timestamp+", offset="+offsetAndTimestamp.getOffset()+
        ", offsetTimestamp="+offsetAndTimestamp.getTimestamp());

  }
});

流量控制

Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause)消息的流入(这里实际上是把消息全部缓存到内存里了);等我们处理了差不多了,可以再继续消费缓存起来的消息(resume)。

我们可以利用 pause 方法和 resume 方法来进行流量控制:

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

//注册一个handler处理进来的消息
consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());

  // 如果我们读到partition0的第5个offset
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // 则暂停读取
    consumer.pause(topicPartition, ar -> {

      if (ar.succeeded()) {

        System.out.println("Paused");

        // 5秒后再恢复,继续读取
        vertx.setTimer(5000, timeId -> {

          // resumi read operations
          consumer.resume(topicPartition);
        });
      }
    });
  }
});

关闭 Consumer

关闭 Consumer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。

由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

consumer.close(res -> {
  if (res.succeeded()) {
    System.out.println("Consumer is now closed");
  } else {
    System.out.println("close failed");
  }
});

发送消息到某个 Topic

您可以利用 write 方法来向某个 topic 发送消息(records)。

最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下,消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。

for (int i = 0; i < 5; i++) {

  // 这里指定了topic和 message value,以round robin方式发送的目标partition
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.write(record);
}

您可以通过绑定 Handler 来接受发送的结果。这个结果其实就是一些元数据(metadata),包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。

for (int i = 0; i < 5; i++) {

  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.write(record, done -> {

    if (done.succeeded()) {

      RecordMetadata recordMetadata = done.result();
      System.out.println("Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
        ", partition=" + recordMetadata.getPartition() +
        ", offset=" + recordMetadata.getOffset());
    }

  });
}

如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:

for (int i = 0; i < 10; i++) {

  // 这里指定了 partition 为 0
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", null, "message_" + i, 0);

  producer.write(record);
}

因为 Producer 可以使用消息的 key 作为 hash 值来确定 partition,所以我们可以保证所有的消息被发送到同样的 partition 中,并且是有序的。


for (int i = 0; i < 10; i++) {

  // i.e. defining different keys for odd and even messages
  int key = i % 2;

  //这里指明了key,所有的消息将被发送同一个partition.
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);

  producer.write(record);
}

注意:可共享的 Producer 通过 createShared 方法创建。它可以在多个 Verticle 实例之间共享,所以相关的配置必须在创建 Producer 的时候定义。

共享 Producer

有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。您可以通过 KafkaProducer.createShared 方法来创建可以在 Verticle 之间安全共享的 KafkaProducer 实例:

KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);

// 关闭
producer1.close();

返回的 KafkaProducer 实例将复用相关的资源(如线程、连接等)。使用完 KafkaProducer 后,直接调用 close 方法关闭即可,相关的资源会自动释放。

关闭 Producer

与关闭 Consumer 类似,关闭 Producer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。

由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。当关闭操作彻底完成以后,注册的 Handler 将会被调用。

producer.close(res -> {
  if (res.succeeded()) {
    System.out.println("Producer is now closed");
  } else {
    System.out.println("close failed");
  }
});

获取 Topic Partition 的相关信息

您可以通过 partitionsFor 方法获取指定 topic 的分区信息。

producer.partitionsFor("test", ar -> {

  if (ar.succeeded()) {

    for (PartitionInfo partitionInfo : ar.result()) {
      System.out.println(partitionInfo);
    }
  }
});

错误处理

您可以利用 KafkaProducer#exceptionHandler 方法和 KafkaConsumer#exceptionHandler 方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。比如:

consumer.exceptionHandler(e -> {
  System.out.println("Error = " + e.getMessage());
});

随 Verticle 自动关闭

如果您是在 Verticle 内部创建的 Consumer 和 Producer,那么当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。

使用 Vert.x 自带的序列化与反序列化机制

Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 BufferJsonObjectJsonArray 等类型。

KafkaConsumer 里您可以使用 Buffer

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化成jsonObject的consumer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化成jsonArray的consumer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

同样在 KafkaProducer 中也可以:

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");

// 创建一个可以序列化成jsonObject的Producer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");

// 创建一个可以序列化成jsonArray的Producer.
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");

您也可以在 create 方法里指明序列化与反序列化相关的类。

比如创建 Consumer 时:

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// 创建一个可以反序列化成Buffer的Consumer
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);

// 创建一个可以反序列化成JsonObject的Consumer
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);

// 创建一个可以反序列化成JsonArray的Consumer
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

创建 Producer 时:

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");

// 创建一个可以序列化成Buffer的Producer.
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);

// 创建一个可以序列化成jsonObject的Producer.
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);

// 创建一个可以序列化成jsonArray的Producer.
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

RxJava API

Vert.x Kafka Client 组件也提供Rx风格的API。

译者注:此处也可以参考 Kafka Stream 相关的 API。

Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();

observable
  .map(record -> record.value())
  .buffer(256)
  .map(
  list -> list.stream().mapToDouble(n -> n).average()
).subscribe(val -> {

  //获取到一个平均值

});

流实现与 Kafka 原生对象

如果您希望直接操作原生的 Kafka record,您可以使用原生的 Kafka 流式对象,它可以处理原生 Kafka 对象。

KafkaReadStream用于读取 topic partition。它是 ConsumerRecord 对象的可读流对象,读到的是 ConsumerRecord 对象。

KafkaWriteStream用于向某些 topic 中写入数据。它是 ProducerRecord 对象的可写流对象。

API通过这些接口将这些方法展现给用户,其他语言版本也应该类似。


原文档最后更新于 2017-03-15 15:54:14 CET

results matching ""

No results matching ""