Kafka是一种高吞吐量的分布式发布订阅消息系统(消息中间件);
持久化:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能;
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息;

应该场景

异步处理(消息推送,图片存储)
系统解耦(微服务之间相互调用)
流量削峰(抢购)

生成者和消费者

对于生成者来说,发送消息到topic时,如果不指定topic内分区,将会采用均匀策略发送消息到不同分区中;
对于消费者来说,在可以多个不同的group来消费同一个topic情况(offset记录消费位置);对于一个group来说,消费者数目不应该多于分区的数量,因为在同一个group中,每个分区只能绑定一个消费者,即消费者可以消费多个分区,一个分区只给一个消费者消费;

并发消费

在ConcurrentKafkaListenerContainerFactory类中可设置Concurrency线程数,意味着当属于这个消费工厂下的消费者消费Topic时将会启动多个消费者线程进行消息,为了多个消费者线程都有分区可消费,初始化Topic时需要初始化分区数目大于等于消费者线程数;

消费者工厂配置

@Bean
public KafkaListenerContainerFactory> kafkaListenerDispositionContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(ConstantsConfiguration.VIID_KAFKA_THREAD);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setGroupId(“dispositiongroup_id”);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode._MANUAL
);
factory.getContainerProperties().setAckCount(10);
factory.getContainerProperties().setAckTime(10000);
return factory;
}

Topic初始化

NewTopic(“subnot_topic”,ConstantsConfiguration._VIID_KAFKA_THREAD,(short)1);

批量消费

kafaka消费者支持多消息进行消费,在消费者监听器中使用list进行接收消息,同时消费者工程需要配置批量处理数据量(消费时最大限度)并且开启批量消费;

消费者监听list

@KafkaListener(topics = “un_topic”, containerFactory = “objectFactory”,autoStartup=”${kafka_auto}”)
public void unTopicObjectFactory(List> records, Acknowledgment ack){
….
}

批量消费配置

@Bean
public Map consumerBatchConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “latest”);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//批量处理300,开启批量才会接收批量数据
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConstantsConfiguration.VIID_KAFKA_BATCH_COUNT);
return props;
}

开启批量消费

@Bean(name = “objectFactory”)
public KafkaListenerContainerFactory> kafkaListenerObjectContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerBatchFactory());
factory.setConcurrency(ConstantsConfiguration.VIID_KAFKA_THREAD);
//消息确认时间10S
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setGroupId(“groupid”);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode._MANUAL
);
factory.getContainerProperties().setAckCount(10);
factory.getContainerProperties().setAckTime(10000);
//开启批量
factory.setBatchListener(true);
return factory;
}

其他(未实践)

部署?集群部署?其他使用场景?