4.Kafka API - 图1

SpringBoot & Kafka

添加依赖

  1. <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.4.0.RELEASE</version>
  6. </dependency>

添加配置

  1. vi application.properties

内容如下:

  1. # kafka config
  2. spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

消息发送

  1. @ Autowired
  2. privete KafkaTemplate template;
  3. private static final String topic = "topicName";
  4. /** 发送消息 **/
  5. @GetMapping("/send/{input}")
  6. public String sendToKafka(@PathVariable String input){
  7. this.template.send(topic,input);
  8. return "send success";
  9. }

消息接收

  1. @KafkaListener(id="",topic = "topicName",groupId = "groupName")
  2. public void listener(String input){
  3. System.out.println(input);
  4. }

事务处理

添加配置

  1. vi application.properties

内容如下:

  1. # 事务id要唯一
  2. spring.kafka.producer.transaction-id-prefix = kafka_tx

注解方式使用事务

  1. @Autowired
  2. privete KafkaTemplate template;
  3. private static final String topic = "topicName";
  4. /** 发送消息 **/
  5. @GetMapping("/send/{input}")
  6. @Transactional(rollbackFor = RuntimeException.class)
  7. public String sendToKafka(@PathVariable String input){
  8. this.template.send(topic,input);
  9. return "send success";
  10. }

executeInTransaction方式使用事务

  1. @Autowired
  2. privete KafkaTemplate template;
  3. private static final String topic = "topicName";
  4. /** 发送消息 **/
  5. @GetMapping("/send/{input}")
  6. public String sendToKafka(@PathVariable String input){
  7. this.template.send(topic,input);
  8. template.executeInTransaction(t->
  9. t.send(topic,input);
  10. if("error".queals(input)){
  11. throw new RuntimeException("is error");
  12. }
  13. t.send(topic,input+"anthor");
  14. return true;
  15. );
  16. return "send success";
  17. }

Producer API

发送流程

image.png

异步发送

  1. Producer<String, String> producer = new KafkaProducer<>(props);
  2. for (int i = 0; i < 10; i++) {
  3. producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i)));
  4. }

同步发送

  1. Producer<String, String> producer = new KafkaProducer<>(props);
  2. for (int i = 0; i < 10; i++) {
  3. producer.send(new ProducerRecord<String, String>("test", "polaris:"+Integer.toString(i))).get();
  4. }

Consumer API

自动提交offset

offset完成提交,在延时时间内,如果消费者进程出现异常,则可能丢失消息。若消费速度快,offset提交慢,在延时时间内,消费者进程出现异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。

  1. // 自动提交offset
  2. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  3. // 自动提交延时
  4. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  5. // ....
  6. while (true) {
  7. ConsumerRecords<String, String> records = consumer.poll(100);
  8. for (ConsumerRecord<String, String> record : records)
  9. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  10. }

手动提交offset

处理完消息后,如果提交offset异常,则下次重启会获取到之前已经处理过的数据,造成重复问题。
(1)异步提交。

  1. // 关闭自动提交offset
  2. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  3. // ....
  4. while (true) {
  5. ConsumerRecords<String, String> records = consumer.poll(100);
  6. for (ConsumerRecord<String, String> record : records) {
  7. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  8. }
  9. //异步提交
  10. consumer.commitAsync(new OffsetCommitCallback() {
  11. @Override
  12. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  13. if (exception != null) {
  14. System.err.println("Commit failed for" + offsets);
  15. }
  16. }
  17. });
  18. }

(2)同步提交。

  1. // 关闭自动提交offset
  2. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  3. // ....
  4. while (true) {
  5. ConsumerRecords<String, String> records = consumer.poll(100);
  6. for (ConsumerRecord<String, String> record : records) {
  7. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  8. }
  9. consumer.commitSync();
  10. }

自定义存储offset

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。

  1. import org.apache.kafka.clients.consumer.*;
  2. import org.apache.kafka.common.TopicPartition;
  3. import java.util.*;
  4. public class CustomConsumer {
  5. private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
  6. public static void main(String[] args) {
  7. //创建配置信息
  8. Properties props = new Properties();
  9. //Kafka集群
  10. props.put("bootstrap.servers", "192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092");
  11. //消费者组,只要group.id相同,就属于同一个消费者组
  12. props.put("group.id", "test");
  13. //关闭自动提交offset
  14. props.put("enable.auto.commit", "false");
  15. //Key和Value的反序列化类
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. //创建一个消费者
  19. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  20. //消费者订阅主题
  21. consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
  22. //该方法会在Rebalance之前调用
  23. @Override
  24. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  25. commitOffset(currentOffset);
  26. }
  27. //该方法会在Rebalance之后调用
  28. @Override
  29. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  30. currentOffset.clear();
  31. for (TopicPartition partition : partitions) {
  32. consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费
  33. }
  34. }
  35. });
  36. while (true) {
  37. ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
  38. for (ConsumerRecord<String, String> record : records) {
  39. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  40. currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
  41. }
  42. commitOffset(currentOffset);//异步提交
  43. }
  44. }
  45. //获取某分区的最新offset
  46. private static long getOffset(TopicPartition partition) {
  47. return 0;
  48. }
  49. //提交该消费者所有分区的offset
  50. private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
  51. }
  52. }

参考:https://blog.csdn.net/qq_38704184/article/details/103200513

Interceptor API

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。

  1. /**
  2. * 自定义生产者
  3. * @description
  4. * 拦截器应用<br/>
  5. * 实现一个简单的双interceptor组成的拦截链。<br/>
  6. * 第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;<br/>
  7. * 第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
  8. */
  9. public class InterceptorProducer {
  10. public static void main(String[] args) throws Exception {
  11. // ...
  12. List<String> interceptors = new ArrayList<>();
  13. interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.TimeInterceptor");
  14. interceptors.add("com.lonton.t8.bigdata.demo.kafka.interceptor.CounterInterceptor");
  15. props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  16. // ....
  17. // 一定要关闭producer,这样才会调用interceptor的close方法
  18. producer.close();
  19. }
  20. }
  1. public class TimeInterceptor implements ProducerInterceptor<String, String> {
  2. @Override
  3. public void configure(Map<String, ?> configs) {}
  4. @Override
  5. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  6. // 创建一个新的record,把时间戳写入消息体的最前部
  7. return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
  8. System.currentTimeMillis() + "," + record.value().toString());
  9. }
  10. @Override
  11. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
  12. @Override
  13. public void close() {}
  14. }
  1. public class CounterInterceptor implements ProducerInterceptor<String, String> {
  2. private int errorCounter = 0;
  3. private int successCounter = 0;
  4. @Override
  5. public void configure(Map<String, ?> configs) {}
  6. @Override
  7. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  8. return record;
  9. }
  10. @Override
  11. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  12. // 统计成功和失败的次数
  13. if (exception == null) {
  14. successCounter++;
  15. } else {
  16. errorCounter++;
  17. }
  18. }
  19. @Override
  20. public void close() {
  21. // 保存结果
  22. System.out.println("Successful sent: " + successCounter);
  23. System.out.println("Failed sent: " + errorCounter);
  24. }
  25. }

Connector API

Connect是Kafka的一部分,是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。
它为在Kafka和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方案。
它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。 Kafka Connect可以获取整个数
据库或从所有应用程序服务器收集指标到Kafka主题,使数据可用于低延迟的流处理。导出作业可以将数据从
Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
4.Kafka API - 图3

Streams API

Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
4.Kafka API - 图4
4.Kafka API - 图5

参考

OrcHome:Kafka Streams开发者指南
https://www.orchome.com/335