导读
由于项目需要,使用读取序列化文件和反序列化。这里仅仅使用了消费者。按照原文档描述,可以知道消费者需要配置反序列化,生成者配置序列化设置。如果同时设置,可能会出现找不到对应配置信息。
使用
消费者
CODE
配置文件
key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer#key.serializer=org.apache.kafka.common.serialization.StringSerializer#value.serializer=org.apache.kafka.common.serialization.StringSerializer
读取配置文件
try {File kafkaConfigFile = new File("conf/kafka.properties");if (kafkaConfigFile.exists()) {inputStream = new FileInputStream(kafkaConfigFile);} else {inputStream = XfConsumerWithKafka.class.getClassLoader().getResourceAsStream("conf/kafka.properties");}props.load(inputStream);props.put("group.id", groupId);consumer = new KafkaConsumer<String, String>(props);System.out.println(Thread.currentThread().getName() + " kafka消费者初始化成功---------------------------------");}
这里的KafkaConsumer类的ConsumerConfig文件
public KafkaConsumer(Properties properties) {this((Properties)properties, (Deserializer)null, (Deserializer)null);}public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);}
ConsumerConfig类 ```java public static Map
addDeserializerToConfig(Map configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) { Map<String, Object> newConfigs = new HashMap();newConfigs.putAll(configs);if (keyDeserializer != null) {newConfigs.put("key.deserializer", keyDeserializer.getClass());}if (valueDeserializer != null) {newConfigs.put("value.deserializer", valueDeserializer.getClass());}return newConfigs;
}
public static Properties addDeserializerToConfig(Properties properties, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();newProperties.putAll(properties);if (keyDeserializer != null) {newProperties.put("key.deserializer", keyDeserializer.getClass().getName());}if (valueDeserializer != null) {newProperties.put("value.deserializer", valueDeserializer.getClass().getName());}return newProperties;
}
<a name="zmGCi"></a>### 问题该类只有反序列化配置,如果配置了序列化,可能会出现以下错误:```java2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'value.serializer' was supplied but isn't a known config.2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'key.serializer' was supplied but isn't a known config.2020-12-16 19:45:37.458 [pool-5-thread-15] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group fraudCdrXf2020-12-16 19:45:37.458 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
解决办法
去掉序列化配置,仅仅要反序列化配置。生产者同理。
生产者
CODE
在调用配置文件的时候
File propFile = new File("conf/kafka.properties");if (propFile.exists()) {inputStream = new FileInputStream(propFile);} else {inputStream = CLKafkaProducer.class.getClassLoader().getResourceAsStream("conf/kafka.properties");}props.load(inputStream);kafkaProducer = new KafkaProducer<String, String>(props);
KafkaProducer文件的ProducerConfig配置。 ```java public KafkaProducer(Properties properties) {
this((ProducerConfig)(new ProducerConfig(properties)), (Serializer)null, (Serializer)null);
}
public KafkaProducer(Properties properties, Serializer
keySerializer, Serializer valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);
}
- ProducerConfig配置```javapublic static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, Serializer<?> keySerializer, Serializer<?> valueSerializer) {Map<String, Object> newConfigs = new HashMap();newConfigs.putAll(configs);if (keySerializer != null) {newConfigs.put("key.serializer", keySerializer.getClass());}if (valueSerializer != null) {newConfigs.put("value.serializer", valueSerializer.getClass());}return newConfigs;}public static Properties addSerializerToConfig(Properties properties, Serializer<?> keySerializer, Serializer<?> valueSerializer) {Properties newProperties = new Properties();newProperties.putAll(properties);if (keySerializer != null) {newProperties.put("key.serializer", keySerializer.getClass().getName());}if (valueSerializer != null) {newProperties.put("value.serializer", valueSerializer.getClass().getName());}return newProperties;}
注意:由上述可以看到只有序列化设置,没有反序列配置。假如出现反序列化问题,如消费者一样,将其反序列化注释掉或者删掉即可。
END
搞定~
