原文: https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/

学习使用JsonSerializerJsonDeserializer类从 Apache Kafka 主题存储和检索 JSON,并返回 Java 模型对象。

1. 先决条件

2. 应用配置

application.properties文件中,我们添加了以下配置。

  1. server.port=9000
  2. spring.kafka.consumer.bootstrap-servers: localhost:9092
  3. spring.kafka.consumer.group-id: group-id
  4. spring.kafka.consumer.auto-offset-reset: earliest
  5. spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  6. spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  7. spring.kafka.consumer.properties.spring.json.trusted.packages=*
  8. spring.kafka.producer.bootstrap-servers: localhost:9092
  9. spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
  10. spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  • spring.kafka.consumer.key-deserializer指定键的反序列化器类。
  • spring.kafka.consumer.value-deserializer指定值的反序列化器类。
  • spring.kafka.consumer.properties.spring.json.trusted.packages指定允许反序列化的程序包模式的逗号分隔列表。 '*'表示反序列化所有程序包。
  • spring.kafka.producer.key-deserializer指定键的序列化器类。
  • spring.kafka.producer.value-deserializer指定值的序列化器类。

3. 模型类

我们创建了User类,该类将发送给 Kafka。 其实例将由JsonSerializer序列化为字节数组。 这个字节数组最终由 Kafka 存储到给定的分区中。

反序列化期间,JsonDeserializer用于以字节数组的形式从 Kafka 接收 JSON,并将User对象返回给应用。

  1. public class User
  2. {
  3. private long userId;
  4. private String firstName;
  5. private String lastName;
  6. public long getUserId() {
  7. return userId;
  8. }
  9. public void setUserId(long userId) {
  10. this.userId = userId;
  11. }
  12. public String getFirstName() {
  13. return firstName;
  14. }
  15. public void setFirstName(String firstName) {
  16. this.firstName = firstName;
  17. }
  18. public String getLastName() {
  19. return lastName;
  20. }
  21. public void setLastName(String lastName) {
  22. this.lastName = lastName;
  23. }
  24. @Override
  25. public String toString() {
  26. return "User [userId=" + userId + ", firstName="
  27. + firstName + ", lastName=" + lastName + "]";
  28. }
  29. }

4. Kafka 生产者

生产者 API 只是在 HTTP POST API 中使用用户信息。 然后,它创建一个新的User对象,并使用KafkaTemplate发送给 Kafka。

  1. @PostMapping(value = "/createUser")
  2. public void sendMessageToKafkaTopic(
  3. @RequestParam("userId") long userId,
  4. @RequestParam("firstName") String firstName,
  5. @RequestParam("lastName") String lastName) {
  6. User user = new User();
  7. user.setUserId(userId);
  8. user.setFirstName(firstName);
  9. user.setLastName(lastName);
  10. this.producerService.saveCreateUserLog(user);
  11. }
  1. @Autowired
  2. private KafkaTemplate<String, Object> kafkaTemplate;
  3. public void saveCreateUserLog(User user)
  4. {
  5. logger.info(String.format("User created -> %s", user));
  6. this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);
  7. }

5. Kafka 消费者

用户被实现为@KafkaListener,每次在主题中添加新条目时都会得到通知。

  1. @KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG,
  2. groupId = AppConstants.GROUP_ID)
  3. public void consume(User user)
  4. {
  5. logger.info(String.format("User created -> %s", user));
  6. }

6. 测试

使用任何 REST API 测试器,并向 API http://localhost:9000/kafka/createUser发送一些消息,如下所示。

留言栏:http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta

观察控制台日志:

  1. 2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]
  2. 2020-05-26 01:03:52.722 INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService
  3. : User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
  4. 2020-05-26 01:03:52.729 INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService
  5. : User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]

7. 总结

在此 SpringBoot kafka JsonSerializer示例中,我们学习了使用JsonSerializer对 Java 对象进行序列化和反序列化并存储在 Kafka 中。

学习愉快!

源码下载