导读
由于项目需要,使用读取序列化文件和反序列化。这里仅仅使用了消费者。按照原文档描述,可以知道消费者需要配置反序列化,生成者配置序列化设置。如果同时设置,可能会出现找不到对应配置信息。
使用
消费者
CODE
配置文件
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer
读取配置文件
try {
File kafkaConfigFile = new File("conf/kafka.properties");
if (kafkaConfigFile.exists()) {
inputStream = new FileInputStream(kafkaConfigFile);
} else {
inputStream = XfConsumerWithKafka.class.getClassLoader().getResourceAsStream("conf/kafka.properties");
}
props.load(inputStream);
props.put("group.id", groupId);
consumer = new KafkaConsumer<String, String>(props);
System.out.println(Thread.currentThread().getName() + " kafka消费者初始化成功---------------------------------");
}
这里的KafkaConsumer类的ConsumerConfig文件
public KafkaConsumer(Properties properties) {
this((Properties)properties, (Deserializer)null, (Deserializer)null);
}
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
}
ConsumerConfig类 ```java public static Map
addDeserializerToConfig(Map configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) { Map<String, Object> newConfigs = new HashMap();
newConfigs.putAll(configs);
if (keyDeserializer != null) {
newConfigs.put("key.deserializer", keyDeserializer.getClass());
}
if (valueDeserializer != null) {
newConfigs.put("value.deserializer", valueDeserializer.getClass());
}
return newConfigs;
}
public static Properties addDeserializerToConfig(Properties properties, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null) {
newProperties.put("key.deserializer", keyDeserializer.getClass().getName());
}
if (valueDeserializer != null) {
newProperties.put("value.deserializer", valueDeserializer.getClass().getName());
}
return newProperties;
}
<a name="zmGCi"></a>
### 问题
该类只有反序列化配置,如果配置了序列化,可能会出现以下错误:
```java
2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'value.serializer' was supplied but isn't a known config.
2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'key.serializer' was supplied but isn't a known config.
2020-12-16 19:45:37.458 [pool-5-thread-15] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group fraudCdrXf
2020-12-16 19:45:37.458 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
解决办法
去掉序列化配置,仅仅要反序列化配置。生产者同理。
生产者
CODE
在调用配置文件的时候
File propFile = new File("conf/kafka.properties");
if (propFile.exists()) {
inputStream = new FileInputStream(propFile);
} else {
inputStream = CLKafkaProducer.class.getClassLoader().getResourceAsStream("conf/kafka.properties");
}
props.load(inputStream);
kafkaProducer = new KafkaProducer<String, String>(props);
KafkaProducer文件的ProducerConfig配置。 ```java public KafkaProducer(Properties properties) {
this((ProducerConfig)(new ProducerConfig(properties)), (Serializer)null, (Serializer)null);
}
public KafkaProducer(Properties properties, Serializer
keySerializer, Serializer valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);
}
- ProducerConfig配置
```java
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Map<String, Object> newConfigs = new HashMap();
newConfigs.putAll(configs);
if (keySerializer != null) {
newConfigs.put("key.serializer", keySerializer.getClass());
}
if (valueSerializer != null) {
newConfigs.put("value.serializer", valueSerializer.getClass());
}
return newConfigs;
}
public static Properties addSerializerToConfig(Properties properties, Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keySerializer != null) {
newProperties.put("key.serializer", keySerializer.getClass().getName());
}
if (valueSerializer != null) {
newProperties.put("value.serializer", valueSerializer.getClass().getName());
}
return newProperties;
}
注意:由上述可以看到只有序列化设置,没有反序列配置。假如出现反序列化问题,如消费者一样,将其反序列化注释掉或者删掉即可。
END
搞定~