原文: https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/
学习使用JsonSerializer和JsonDeserializer类从 Apache Kafka 主题存储和检索 JSON,并返回 Java 模型对象。
1. 先决条件
- 请按照本指南在您的机器上设置 Kafka 。
- 我们正在修改 Spring boot 和 Kafka HelloWorld 应用。
- 还要确保您的计算机上至少安装了 Java8 和 Maven 。
2. 应用配置
在application.properties文件中,我们添加了以下配置。
server.port=9000spring.kafka.consumer.bootstrap-servers: localhost:9092spring.kafka.consumer.group-id: group-idspring.kafka.consumer.auto-offset-reset: earliestspring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.properties.spring.json.trusted.packages=*spring.kafka.producer.bootstrap-servers: localhost:9092spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer指定键的反序列化器类。spring.kafka.consumer.value-deserializer指定值的反序列化器类。spring.kafka.consumer.properties.spring.json.trusted.packages指定允许反序列化的程序包模式的逗号分隔列表。'*'表示反序列化所有程序包。spring.kafka.producer.key-deserializer指定键的序列化器类。spring.kafka.producer.value-deserializer指定值的序列化器类。
3. 模型类
我们创建了User类,该类将发送给 Kafka。 其实例将由JsonSerializer序列化为字节数组。 这个字节数组最终由 Kafka 存储到给定的分区中。
反序列化期间,JsonDeserializer用于以字节数组的形式从 Kafka 接收 JSON,并将User对象返回给应用。
public class User{private long userId;private String firstName;private String lastName;public long getUserId() {return userId;}public void setUserId(long userId) {this.userId = userId;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}@Overridepublic String toString() {return "User [userId=" + userId + ", firstName="+ firstName + ", lastName=" + lastName + "]";}}
4. Kafka 生产者
生产者 API 只是在 HTTP POST API 中使用用户信息。 然后,它创建一个新的User对象,并使用KafkaTemplate发送给 Kafka。
@PostMapping(value = "/createUser")public void sendMessageToKafkaTopic(@RequestParam("userId") long userId,@RequestParam("firstName") String firstName,@RequestParam("lastName") String lastName) {User user = new User();user.setUserId(userId);user.setFirstName(firstName);user.setLastName(lastName);this.producerService.saveCreateUserLog(user);}
@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void saveCreateUserLog(User user){logger.info(String.format("User created -> %s", user));this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);}
5. Kafka 消费者
用户被实现为@KafkaListener,每次在主题中添加新条目时都会得到通知。
@KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG,groupId = AppConstants.GROUP_ID)public void consume(User user){logger.info(String.format("User created -> %s", user));}
6. 测试
使用任何 REST API 测试器,并向 API http://localhost:9000/kafka/createUser发送一些消息,如下所示。
留言栏:http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta
观察控制台日志:
2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]2020-05-26 01:03:52.722 INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]2020-05-26 01:03:52.729 INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
7. 总结
在此 SpringBoot kafka JsonSerializer示例中,我们学习了使用JsonSerializer对 Java 对象进行序列化和反序列化并存储在 Kafka 中。
学习愉快!
