原文: https://howtodoinjava.com/kafka/spring-boot-jsonserializer-example/
学习使用JsonSerializer
和JsonDeserializer
类从 Apache Kafka 主题存储和检索 JSON,并返回 Java 模型对象。
1. 先决条件
- 请按照本指南在您的机器上设置 Kafka 。
- 我们正在修改 Spring boot 和 Kafka HelloWorld 应用。
- 还要确保您的计算机上至少安装了 Java8 和 Maven 。
2. 应用配置
在application.properties
文件中,我们添加了以下配置。
server.port=9000
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer
指定键的反序列化器类。spring.kafka.consumer.value-deserializer
指定值的反序列化器类。spring.kafka.consumer.properties.spring.json.trusted.packages
指定允许反序列化的程序包模式的逗号分隔列表。'*'
表示反序列化所有程序包。spring.kafka.producer.key-deserializer
指定键的序列化器类。spring.kafka.producer.value-deserializer
指定值的序列化器类。
3. 模型类
我们创建了User
类,该类将发送给 Kafka。 其实例将由JsonSerializer
序列化为字节数组。 这个字节数组最终由 Kafka 存储到给定的分区中。
反序列化期间,JsonDeserializer
用于以字节数组的形式从 Kafka 接收 JSON,并将User
对象返回给应用。
public class User
{
private long userId;
private String firstName;
private String lastName;
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User [userId=" + userId + ", firstName="
+ firstName + ", lastName=" + lastName + "]";
}
}
4. Kafka 生产者
生产者 API 只是在 HTTP POST API 中使用用户信息。 然后,它创建一个新的User
对象,并使用KafkaTemplate
发送给 Kafka。
@PostMapping(value = "/createUser")
public void sendMessageToKafkaTopic(
@RequestParam("userId") long userId,
@RequestParam("firstName") String firstName,
@RequestParam("lastName") String lastName) {
User user = new User();
user.setUserId(userId);
user.setFirstName(firstName);
user.setLastName(lastName);
this.producerService.saveCreateUserLog(user);
}
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void saveCreateUserLog(User user)
{
logger.info(String.format("User created -> %s", user));
this.kafkaTemplate.send(AppConstants.TOPIC_NAME_USER_LOG, user);
}
5. Kafka 消费者
用户被实现为@KafkaListener
,每次在主题中添加新条目时都会得到通知。
@KafkaListener(topics = AppConstants.TOPIC_NAME_USER_LOG,
groupId = AppConstants.GROUP_ID)
public void consume(User user)
{
logger.info(String.format("User created -> %s", user));
}
6. 测试
使用任何 REST API 测试器,并向 API http://localhost:9000/kafka/createUser
发送一些消息,如下所示。
留言栏:http://localhost:9000/kafka/createUser?userId=1&firstName=Lokesh&lastName=Gupta
观察控制台日志:
2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]
2020-05-26 01:03:52.722 INFO 11924 --- [nio-9000-exec-6] c.h.k.demo.service.KafKaProducerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
2020-05-26 01:03:52.729 INFO 11924 --- [ntainer#1-0-C-1] c.h.k.demo.service.KafKaConsumerService
: User created -> User [userId=1, firstName=Lokesh, lastName=Gupta]
7. 总结
在此 SpringBoot kafka JsonSerializer
示例中,我们学习了使用JsonSerializer
对 Java 对象进行序列化和反序列化并存储在 Kafka 中。
学习愉快!