导读


由于项目需要,使用读取序列化文件和反序列化。这里仅仅使用了消费者。按照原文档描述,可以知道消费者需要配置反序列化,生成者配置序列化设置。如果同时设置,可能会出现找不到对应配置信息。

使用


消费者

CODE

  • 配置文件

    1. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    2. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    3. #key.serializer=org.apache.kafka.common.serialization.StringSerializer
    4. #value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 读取配置文件

    1. try {
    2. File kafkaConfigFile = new File("conf/kafka.properties");
    3. if (kafkaConfigFile.exists()) {
    4. inputStream = new FileInputStream(kafkaConfigFile);
    5. } else {
    6. inputStream = XfConsumerWithKafka.class.getClassLoader().getResourceAsStream("conf/kafka.properties");
    7. }
    8. props.load(inputStream);
    9. props.put("group.id", groupId);
    10. consumer = new KafkaConsumer<String, String>(props);
    11. System.out.println(Thread.currentThread().getName() + " kafka消费者初始化成功---------------------------------");
    12. }
  • 这里的KafkaConsumer类的ConsumerConfig文件

    1. public KafkaConsumer(Properties properties) {
    2. this((Properties)properties, (Deserializer)null, (Deserializer)null);
    3. }
    4. public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    5. this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    6. }
  • ConsumerConfig类 ```java public static Map addDeserializerToConfig(Map configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {

    1. Map<String, Object> newConfigs = new HashMap();
    2. newConfigs.putAll(configs);
    3. if (keyDeserializer != null) {
    4. newConfigs.put("key.deserializer", keyDeserializer.getClass());
    5. }
    6. if (valueDeserializer != null) {
    7. newConfigs.put("value.deserializer", valueDeserializer.getClass());
    8. }
    9. return newConfigs;

    }

    public static Properties addDeserializerToConfig(Properties properties, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {

    1. Properties newProperties = new Properties();
    2. newProperties.putAll(properties);
    3. if (keyDeserializer != null) {
    4. newProperties.put("key.deserializer", keyDeserializer.getClass().getName());
    5. }
    6. if (valueDeserializer != null) {
    7. newProperties.put("value.deserializer", valueDeserializer.getClass().getName());
    8. }
    9. return newProperties;

    }

  1. <a name="zmGCi"></a>
  2. ### 问题
  3. 该类只有反序列化配置,如果配置了序列化,可能会出现以下错误:
  4. ```java
  5. 2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'value.serializer' was supplied but isn't a known config.
  6. 2020-12-16 19:45:37.458 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'key.serializer' was supplied but isn't a known config.
  7. 2020-12-16 19:45:37.458 [pool-5-thread-15] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group fraudCdrXf
  8. 2020-12-16 19:45:37.458 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1

解决办法

去掉序列化配置,仅仅要反序列化配置。生产者同理。

生产者

CODE

在调用配置文件的时候

  1. File propFile = new File("conf/kafka.properties");
  2. if (propFile.exists()) {
  3. inputStream = new FileInputStream(propFile);
  4. } else {
  5. inputStream = CLKafkaProducer.class.getClassLoader().getResourceAsStream("conf/kafka.properties");
  6. }
  7. props.load(inputStream);
  8. kafkaProducer = new KafkaProducer<String, String>(props);
  • KafkaProducer文件的ProducerConfig配置。 ```java public KafkaProducer(Properties properties) {

    1. this((ProducerConfig)(new ProducerConfig(properties)), (Serializer)null, (Serializer)null);

    }

    public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) {

    1. this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);

    }

  1. - ProducerConfig配置
  2. ```java
  3. public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, Serializer<?> keySerializer, Serializer<?> valueSerializer) {
  4. Map<String, Object> newConfigs = new HashMap();
  5. newConfigs.putAll(configs);
  6. if (keySerializer != null) {
  7. newConfigs.put("key.serializer", keySerializer.getClass());
  8. }
  9. if (valueSerializer != null) {
  10. newConfigs.put("value.serializer", valueSerializer.getClass());
  11. }
  12. return newConfigs;
  13. }
  14. public static Properties addSerializerToConfig(Properties properties, Serializer<?> keySerializer, Serializer<?> valueSerializer) {
  15. Properties newProperties = new Properties();
  16. newProperties.putAll(properties);
  17. if (keySerializer != null) {
  18. newProperties.put("key.serializer", keySerializer.getClass().getName());
  19. }
  20. if (valueSerializer != null) {
  21. newProperties.put("value.serializer", valueSerializer.getClass().getName());
  22. }
  23. return newProperties;
  24. }

注意:由上述可以看到只有序列化设置,没有反序列配置。假如出现反序列化问题,如消费者一样,将其反序列化注释掉或者删掉即可。

END


搞定~